Merge pull request #760 from obscuren/develop

core: transaction fixes
This commit is contained in:
Jeffrey Wilcke 2015-04-21 03:14:38 -07:00
commit 4ad8b28794
20 changed files with 45 additions and 4053 deletions

View File

@ -1,911 +0,0 @@
package blockpool
import (
"fmt"
"math/big"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/pow"
)
var (
// max number of block hashes sent in one request
blockHashesBatchSize = 256
// max number of blocks sent in one request
blockBatchSize = 64
// interval between two consecutive block checks (and requests)
blocksRequestInterval = 3 * time.Second
// level of redundancy in block requests sent
blocksRequestRepetition = 1
// interval between two consecutive block hash checks (and requests)
blockHashesRequestInterval = 3 * time.Second
// max number of idle iterations, ie., check through a section without new blocks coming in
blocksRequestMaxIdleRounds = 20
// timeout interval: max time allowed for peer without sending a block hash
blockHashesTimeout = 60 * time.Second
// timeout interval: max time allowed for peer without sending a block
blocksTimeout = 60 * time.Second
// timeout interval: max time allowed for best peer to remain idle (not send new block after sync complete)
idleBestPeerTimeout = 60 * time.Second
// duration of suspension after peer fatal error during which peer is not allowed to reconnect
peerSuspensionInterval = 300 * time.Second
// status is logged every statusUpdateInterval
statusUpdateInterval = 3 * time.Second
//
nodeCacheSize = 1000
)
// blockpool config, values default to constants
type Config struct {
BlockHashesBatchSize int
BlockBatchSize int
BlocksRequestRepetition int
BlocksRequestMaxIdleRounds int
NodeCacheSize int
BlockHashesRequestInterval time.Duration
BlocksRequestInterval time.Duration
BlockHashesTimeout time.Duration
BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration
PeerSuspensionInterval time.Duration
StatusUpdateInterval time.Duration
}
// blockpool errors
const (
ErrInvalidBlock = iota
ErrInvalidPoW
ErrInsufficientChainInfo
ErrIdleTooLong
ErrIncorrectTD
ErrUnrequestedBlock
)
// error descriptions
var errorToString = map[int]string{
ErrInvalidBlock: "Invalid block", // fatal
ErrInvalidPoW: "Invalid PoW", // fatal
ErrInsufficientChainInfo: "Insufficient chain info", // fatal
ErrIdleTooLong: "Idle too long", // fatal
ErrIncorrectTD: "Incorrect Total Difficulty", // should be fatal, not now temporarily
ErrUnrequestedBlock: "Unrequested block",
}
// error severity
func severity(code int) logger.LogLevel {
switch code {
case ErrIncorrectTD:
return logger.WarnLevel
case ErrUnrequestedBlock:
return logger.WarnLevel
default:
return logger.ErrorLevel
}
}
// init initialises the Config, zero values fall back to constants
func (self *Config) init() {
if self.BlockHashesBatchSize == 0 {
self.BlockHashesBatchSize = blockHashesBatchSize
}
if self.BlockBatchSize == 0 {
self.BlockBatchSize = blockBatchSize
}
if self.BlocksRequestRepetition == 0 {
self.BlocksRequestRepetition = blocksRequestRepetition
}
if self.BlocksRequestMaxIdleRounds == 0 {
self.BlocksRequestMaxIdleRounds = blocksRequestMaxIdleRounds
}
if self.BlockHashesRequestInterval == 0 {
self.BlockHashesRequestInterval = blockHashesRequestInterval
}
if self.BlocksRequestInterval == 0 {
self.BlocksRequestInterval = blocksRequestInterval
}
if self.BlockHashesTimeout == 0 {
self.BlockHashesTimeout = blockHashesTimeout
}
if self.BlocksTimeout == 0 {
self.BlocksTimeout = blocksTimeout
}
if self.IdleBestPeerTimeout == 0 {
self.IdleBestPeerTimeout = idleBestPeerTimeout
}
if self.PeerSuspensionInterval == 0 {
self.PeerSuspensionInterval = peerSuspensionInterval
}
if self.NodeCacheSize == 0 {
self.NodeCacheSize = nodeCacheSize
}
if self.StatusUpdateInterval == 0 {
self.StatusUpdateInterval = statusUpdateInterval
}
}
// node is the basic unit of the internal model of block chain/tree in the blockpool
type node struct {
lock sync.RWMutex
hash common.Hash
block *types.Block
hashBy string
blockBy string
peers map[string]bool
td *big.Int
}
type index struct {
int
}
// entry is the struct kept and indexed in the pool
type entry struct {
node *node
section *section
index *index
}
type BlockPool struct {
Config *Config
// the minimal interface with blockchain manager
hasBlock func(hash common.Hash) bool // query if block is known
insertChain func(types.Blocks) error // add section to blockchain
verifyPoW func(pow.Block) bool // soft PoW verification
chainEvents *event.TypeMux // ethereum eventer for chainEvents
tdSub event.Subscription // subscription to core.ChainHeadEvent
td *big.Int // our own total difficulty
pool map[common.Hash]*entry // the actual blockpool
peers *peers // peers manager in peers.go
status *status // info about blockpool (UI interface) in status.go
lock sync.RWMutex
chainLock sync.RWMutex
// alloc-easy pool of hash slices
hashSlicePool chan []common.Hash
nodeCache map[common.Hash]*node
nodeCacheLock sync.RWMutex
nodeCacheList []common.Hash
// waitgroup is used in tests to wait for result-critical routines
// as well as in determining idle / syncing status
wg sync.WaitGroup //
quit chan bool // chan used for quitting parallel routines
running bool //
}
// public constructor
// after blockpool returned, config can be set
// BlockPool.Start will call Config.init to set missing values
func New(
hasBlock func(hash common.Hash) bool,
insertChain func(types.Blocks) error,
verifyPoW func(pow.Block) bool,
chainEvents *event.TypeMux,
td *big.Int,
) *BlockPool {
return &BlockPool{
Config: &Config{},
hasBlock: hasBlock,
insertChain: insertChain,
verifyPoW: verifyPoW,
chainEvents: chainEvents,
td: td,
}
}
// allows restart
func (self *BlockPool) Start() {
self.lock.Lock()
defer self.lock.Unlock()
if self.running {
return
}
// set missing values
self.Config.init()
self.hashSlicePool = make(chan []common.Hash, 150)
self.nodeCache = make(map[common.Hash]*node)
self.status = newStatus()
self.quit = make(chan bool)
self.pool = make(map[common.Hash]*entry)
self.running = true
self.peers = &peers{
errors: &errs.Errors{
Package: "Blockpool",
Errors: errorToString,
Level: severity,
},
peers: make(map[string]*peer),
blacklist: make(map[string]time.Time),
status: self.status,
bp: self,
}
// subscribe and listen to core.ChainHeadEvent{} for uptodate TD
self.tdSub = self.chainEvents.Subscribe(core.ChainHeadEvent{})
// status update interval
timer := time.NewTicker(self.Config.StatusUpdateInterval)
go func() {
for {
select {
case <-self.quit:
return
case event := <-self.tdSub.Chan():
if ev, ok := event.(core.ChainHeadEvent); ok {
td := ev.Block.Td
var height *big.Int
if (ev.Block.HeaderHash == common.Hash{}) {
height = ev.Block.Header().Number
}
glog.V(logger.Detail).Infof("ChainHeadEvent: height: %v, td: %v, hash: %s", height, td, hex(ev.Block.Hash()))
self.setTD(td)
self.peers.lock.Lock()
if best := self.peers.best; best != nil {
// only switch if we strictly go above otherwise we may stall if only
if td.Cmp(best.td) > 0 {
self.peers.best = nil
self.switchPeer(best, nil)
}
}
self.peers.lock.Unlock()
}
case <-timer.C:
glog.V(logger.Detail).Infof("status:\n%v", self.Status())
}
}
}()
glog.V(logger.Info).Infoln("Blockpool started")
}
func (self *BlockPool) Stop() {
self.lock.Lock()
if !self.running {
self.lock.Unlock()
return
}
self.running = false
self.lock.Unlock()
glog.V(logger.Info).Infoln("Stopping...")
self.tdSub.Unsubscribe()
close(self.quit)
self.lock.Lock()
self.peers = nil
self.pool = nil
self.lock.Unlock()
glog.V(logger.Info).Infoln("Stopped")
}
// Wait blocks until active processes finish
func (self *BlockPool) Wait(t time.Duration) {
self.lock.Lock()
if !self.running {
self.lock.Unlock()
return
}
self.lock.Unlock()
glog.V(logger.Info).Infoln("Waiting for processes to complete...")
w := make(chan bool)
go func() {
self.wg.Wait()
close(w)
}()
select {
case <-w:
glog.V(logger.Info).Infoln("Processes complete")
case <-time.After(t):
glog.V(logger.Warn).Infoln("Timeout")
}
}
/*
AddPeer is called by the eth protocol instance running on the peer after
the status message has been received with total difficulty and current block hash
Called a second time with the same peer id, it is used to update chain info for a peer.
This is used when a new (mined) block message is received.
RemovePeer needs to be called when the peer disconnects.
Peer info is currently not persisted across disconnects (or sessions) except for suspension
*/
func (self *BlockPool) AddPeer(
td *big.Int, currentBlockHash common.Hash,
peerId string,
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (best bool, suspended bool) {
return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError)
}
// RemovePeer needs to be called when the peer disconnects
func (self *BlockPool) RemovePeer(peerId string) {
self.peers.removePeer(peerId, true)
}
/*
AddBlockHashes
Entry point for eth protocol to add block hashes received via BlockHashesMsg
Only hashes from the best peer are handled
Initiates further hash requests until a known parent is reached (unless cancelled by a peerSwitch event, i.e., when a better peer becomes best peer)
Launches all block request processes on each chain section
The first argument is an iterator function. Using this block hashes are decoded from the rlp message payload on demand. As a result, AddBlockHashes needs to run synchronously for one peer since the message is discarded if the caller thread returns.
*/
func (self *BlockPool) AddBlockHashes(next func() (common.Hash, bool), peerId string) {
bestpeer, best := self.peers.getPeer(peerId)
if !best {
return
}
// bestpeer is still the best peer
self.wg.Add(1)
defer func() { self.wg.Done() }()
self.status.lock.Lock()
self.status.activePeers[bestpeer.id]++
self.status.lock.Unlock()
var n int
var hash common.Hash
var ok, headSection, peerswitch bool
var sec, child, parent *section
var entry *entry
var nodes []*node
hash, ok = next()
bestpeer.lock.RLock()
glog.V(logger.Debug).Infof("AddBlockHashes: peer <%s> starting from [%s] (peer head: %s)", peerId, hex(bestpeer.parentHash), hex(bestpeer.currentBlockHash))
// first check if we are building the head section of a peer's chain
if bestpeer.parentHash == hash {
if self.hasBlock(bestpeer.currentBlockHash) {
bestpeer.lock.RUnlock()
return
}
/*
When peer is promoted in switchPeer, a new header section process is launched.
Once the head section skeleton is actually created here, it is signaled to the process
so that it can quit.
In the special case that the node for parent of the head block is found in the blockpool
(with or without fetched block), a singleton section containing only the head block node is created.
*/
headSection = true
if entry := self.get(bestpeer.currentBlockHash); entry == nil {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(bestpeer.parentHash))
// if head block is not yet in the pool, create entry and start node list for section
self.nodeCacheLock.Lock()
n := self.findOrCreateNode(bestpeer.currentBlockHash, peerId)
n.block = bestpeer.currentBlock
n.blockBy = peerId
n.td = bestpeer.td
self.nodeCacheLock.Unlock()
// nodes is a list of nodes in one section ordered top-bottom (old to young)
nodes = append(nodes, n)
} else {
// otherwise set child section iff found node is the root of a section
// this is a possible scenario when a singleton head section was created
// on an earlier occasion when this peer or another with the same block was best peer
if entry.node == entry.section.bottom {
child = entry.section
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s>: connects to child section root %s", peerId, hex(bestpeer.currentBlockHash))
}
}
} else {
// otherwise : we are not building the head section of the peer
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) section starting from [%s] ", peerId, hex(bestpeer.currentBlockHash), hex(hash))
}
// the switch channel signals peerswitch event
bestpeer.lock.RUnlock()
// iterate over hashes coming from peer (first round we have hash set above)
LOOP:
for ; ok; hash, ok = next() {
n++
select {
case <-self.quit:
// global quit for blockpool
return
case <-bestpeer.switchC:
// if the peer is demoted, no more hashes read
glog.V(logger.Detail).Infof("AddBlockHashes: demoted peer <%s> (head: %s)", peerId, hex(bestpeer.currentBlockHash), hex(hash))
peerswitch = true
break LOOP
default:
}
// if we reach the blockchain we stop reading further blockhashes
if self.hasBlock(hash) {
// check if known block connecting the downloaded chain to our blockchain
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
if len(nodes) == 1 {
glog.V(logger.Detail).Infof("AddBlockHashes: singleton section pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", peerId, hex(bestpeer.currentBlockHash), hex(hash))
// create new section if needed and push it to the blockchain
sec = self.newSection(nodes)
sec.addSectionToBlockChain(bestpeer)
} else {
/*
not added hash yet but according to peer child section built
earlier chain connects with blockchain
this maybe a potential vulnarability
the root block arrives (or already there but its parenthash was not pointing to known block in the blockchain)
we start inserting -> error -> remove the entire chain
instead of punishing this peer
solution: when switching peers always make sure best peers own head block
and td together with blockBy are recorded on the node
*/
if len(nodes) == 0 && child != nil {
glog.V(logger.Detail).Infof("AddBlockHashes: child section [%s] pushed to blockchain peer <%s> (head: %s) found block %s in the blockchain", sectionhex(child), peerId, hex(bestpeer.currentBlockHash), hex(hash))
child.addSectionToBlockChain(bestpeer)
}
}
break LOOP
}
// look up node in the pool
entry = self.get(hash)
if entry != nil {
// reached a known chain in the pool
if entry.node == entry.section.bottom && n == 1 {
/*
The first block hash received is an orphan node in the pool
This also supports clients that (despite the spec) include <from> hash in their
response to hashes request. Note that by providing <from> we can link sections
without having to wait for the root block of the child section to arrive, so it allows for superior performance.
*/
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found head block [%s] as root of connecting child section [%s] skipping", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
// record the entry's chain section as child section
child = entry.section
continue LOOP
}
// otherwise record entry's chain section as parent connecting it to the pool
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) found block [%s] in section [%s]. Connected to pool.", peerId, hex(bestpeer.currentBlockHash), hex(hash), sectionhex(entry.section))
parent = entry.section
break LOOP
}
// finally if node for block hash does not exist, create it and append node to section nodes
self.nodeCacheLock.Lock()
nodes = append(nodes, self.findOrCreateNode(hash, peerId))
self.nodeCacheLock.Unlock()
} //for
/*
we got here if
- run out of hashes (parent = nil) sent by our best peer
- our peer is demoted (peerswitch = true)
- reached blockchain or blockpool
- quitting
*/
self.chainLock.Lock()
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): %v nodes in new section", peerId, hex(bestpeer.currentBlockHash), len(nodes))
/*
Handle forks where connecting node is mid-section by splitting section at fork.
No splitting needed if connecting node is head of a section.
*/
if parent != nil && entry != nil && entry.node != parent.top && len(nodes) > 0 {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): fork after %s", peerId, hex(bestpeer.currentBlockHash), hex(hash))
self.splitSection(parent, entry)
self.status.lock.Lock()
self.status.values.Forks++
self.status.lock.Unlock()
}
// If new section is created, link it to parent/child sections.
sec = self.linkSections(nodes, parent, child)
if sec != nil {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): section [%s] created", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
}
self.chainLock.Unlock()
/*
If a blockpool node is reached (parent section is not nil),
activate section (unless our peer is demoted by now).
This can be the bottom half of a newly split section in case of a fork.
bestPeer is nil if we got here after our peer got demoted while processing.
In this case no activation should happen
*/
if parent != nil && !peerswitch {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): parent section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(parent))
self.activateChain(parent, bestpeer, bestpeer.switchC, nil)
}
/*
If a new section was created, register section iff head section or no child known
Activate it with this peer.
*/
if sec != nil {
// switch on section process (it is paused by switchC)
if !peerswitch {
if headSection || child == nil {
bestpeer.lock.Lock()
bestpeer.sections = append(bestpeer.sections, sec.top.hash)
bestpeer.lock.Unlock()
}
/*
Request another batch of older block hashes for parent section here.
But only once, repeating only when the section's root block arrives.
Otherwise no way to check if it arrived.
*/
bestpeer.requestBlockHashes(sec.bottom.hash)
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s): start requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
sec.activate(bestpeer)
} else {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) no longer best: delay requesting blocks for section [%s]", peerId, hex(bestpeer.currentBlockHash), sectionhex(sec))
sec.deactivate()
}
}
// If we are processing peer's head section, signal it to headSection process that it is created.
if headSection {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section registered on head section process", peerId, hex(bestpeer.currentBlockHash))
var headSec *section
switch {
case sec != nil:
headSec = sec
case child != nil:
headSec = child
default:
headSec = parent
}
if !peerswitch {
glog.V(logger.Detail).Infof("AddBlockHashes: peer <%s> (head: %s) head section [%s] created signalled to head section process", peerId, hex(bestpeer.currentBlockHash), sectionhex(headSec))
bestpeer.headSectionC <- headSec
}
}
}
/*
AddBlock is the entry point for the eth protocol to call when blockMsg is received.
It has a strict interpretation of the protocol in that if the block received has not been requested, it results in an error.
At the same time it is opportunistic in that if a requested block may be provided by any peer.
The received block is checked for PoW. Only the first PoW-valid block for a hash is considered legit.
If the block received is the head block of the current best peer, signal it to the head section process
*/
func (self *BlockPool) AddBlock(block *types.Block, peerId string) {
self.status.lock.Lock()
self.status.activePeers[peerId]++
self.status.lock.Unlock()
hash := block.Hash()
// check if block is already inserted in the blockchain
if self.hasBlock(hash) {
return
}
sender, _ := self.peers.getPeer(peerId)
if sender == nil {
return
}
sender.lock.Lock()
tdFromCurrentHead, currentBlockHash := sender.setChainInfoFromBlock(block)
entry := self.get(hash)
/* @zelig !!!
requested 5 hashes from both A & B. A responds sooner then B, process blocks. Close section.
delayed B sends you block ... UNREQUESTED. Blocked
if entry == nil {
glog.V(logger.Detail).Infof("AddBlock: unrequested block %s received from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrUnrequestedBlock, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
var bnode *node
if entry == nil {
self.nodeCacheLock.Lock()
bnode = self.findOrCreateNode(currentBlockHash, peerId)
self.nodeCacheLock.Unlock()
} else {
bnode = entry.node
}
bnode.lock.Lock()
// check if block already received
if bnode.block != nil {
glog.V(logger.Detail).Infof("AddBlock: block %s from peer <%s> (head: %s) already sent by <%s> ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.blockBy)
// register peer on node as source
if bnode.peers == nil {
bnode.peers = make(map[string]bool)
}
foundBlockCurrentHead, found := bnode.peers[sender.id]
if !found || foundBlockCurrentHead {
// if found but not FoundBlockCurrentHead, then no update
// necessary (||)
bnode.peers[sender.id] = (currentBlockHash == hash)
// for those that are false, TD will update their head
// for those that are true, TD is checked !
// this is checked at the time of TD calculation in checkTD
}
sender.setChainInfoFromNode(bnode)
} else {
/*
@zelig needs discussing
Viktor: pow check can be delayed in a go routine and therefore cache
creation is not blocking
// validate block for PoW
if !self.verifyPoW(block) {
glog.V(logger.Warn).Warnf("AddBlock: invalid PoW on block %s from peer <%s> (head: %s)", hex(hash), peerId, hex(sender.currentBlockHash))
sender.addError(ErrInvalidPoW, "%x", hash)
self.status.lock.Lock()
self.status.badPeers[peerId]++
self.status.lock.Unlock()
return
}
*/
bnode.block = block
bnode.blockBy = peerId
glog.V(logger.Detail).Infof("AddBlock: set td on node %s from peer <%s> (head: %s) to %v (was %v) ", hex(hash), peerId, hex(sender.currentBlockHash), bnode.td, tdFromCurrentHead)
bnode.td = tdFromCurrentHead
self.status.lock.Lock()
self.status.values.Blocks++
self.status.values.BlocksInPool++
self.status.lock.Unlock()
}
bnode.lock.Unlock()
currentBlockC := sender.currentBlockC
switchC := sender.switchC
sender.lock.Unlock()
// this must be called without peerlock.
// peerlock held can halt the loop and block on select forever
if tdFromCurrentHead != nil {
select {
case currentBlockC <- block:
case <-switchC: // peer is not best peer
}
}
}
func (self *BlockPool) findOrCreateNode(hash common.Hash, peerId string) (bnode *node) {
bnode, _ = self.nodeCache[hash]
if bnode == nil {
bnode = &node{
hash: hash,
hashBy: peerId,
}
self.nodeCache[hash] = bnode
// purge oversize cache
if len(self.nodeCache) > self.Config.NodeCacheSize {
delete(self.nodeCache, self.nodeCacheList[0])
self.nodeCacheList = append(self.nodeCacheList[1:], hash)
} else {
self.nodeCacheList = append(self.nodeCacheList, hash)
}
self.status.lock.Lock()
self.status.values.BlockHashes++
self.status.lock.Unlock()
}
return
}
/*
activateChain iterates down a chain section by section.
It activates the section process on incomplete sections with peer.
It relinks orphaned sections with their parent if root block (and its parent hash) is known.
*/
func (self *BlockPool) activateChain(sec *section, p *peer, switchC chan bool, connected map[common.Hash]*section) {
var i int
LOOP:
for sec != nil {
parent := sec.parent
glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s>", sectionhex(sec), p.id)
sec.activate(p)
if i > 0 && connected != nil {
connected[sec.top.hash] = sec
}
/*
Need to relink both complete and incomplete sections
An incomplete section could have been blockHashesRequestsComplete before being delinked from its parent.
*/
if parent == nil {
if sec.bottom.block != nil {
if entry := self.get(sec.bottom.block.ParentHash()); entry != nil {
parent = entry.section
glog.V(logger.Detail).Infof("activateChain: [%s]-[%s] link", sectionhex(parent), sectionhex(sec))
link(parent, sec)
}
} else {
glog.V(logger.Detail).Infof("activateChain: section [%s] activated by peer <%s> has missing root block", sectionhex(sec), p.id)
}
}
sec = parent
// stop if peer got demoted or global quit
select {
case <-switchC:
break LOOP
case <-self.quit:
break LOOP
default:
}
}
}
// check if block's actual TD (calculated after successful insertChain) is identical to TD advertised for peer's head block.
func (self *BlockPool) checkTD(nodes ...*node) {
for _, n := range nodes {
// skip check if queued future block
n.lock.RLock()
if n.td != nil && !n.block.Queued() {
glog.V(logger.Detail).Infof("peer td %v =?= block td %v", n.td, n.block.Td)
// @zelig: Commented out temp untill the rest of the network has been fixed.
if n.td.Cmp(n.block.Td) != 0 {
self.peers.peerError(n.blockBy, ErrIncorrectTD, "on block %x peer td %v =?= block td %v", n.hash, n.td, n.block.Td)
self.status.lock.Lock()
self.status.badPeers[n.blockBy]++
self.status.lock.Unlock()
}
}
n.lock.RUnlock()
}
}
// requestBlocks must run in separate go routine, otherwise
// switchpeer -> activateChain -> activate deadlocks on section process select and peers.lock
func (self *BlockPool) requestBlocks(attempts int, hashes []common.Hash) {
self.wg.Add(1)
go func() {
self.peers.requestBlocks(attempts, hashes)
self.wg.Done()
}()
}
// convenience methods to access adjacent sections
func (self *BlockPool) getParent(sec *section) *section {
self.chainLock.RLock()
defer self.chainLock.RUnlock()
return sec.parent
}
func (self *BlockPool) getChild(sec *section) *section {
self.chainLock.RLock()
defer self.chainLock.RUnlock()
return sec.child
}
// accessor and setter for entries in the pool
func (self *BlockPool) get(hash common.Hash) *entry {
self.lock.RLock()
defer self.lock.RUnlock()
return self.pool[hash]
}
func (self *BlockPool) set(hash common.Hash, e *entry) {
self.lock.Lock()
defer self.lock.Unlock()
self.pool[hash] = e
}
// accessor and setter for total difficulty
func (self *BlockPool) getTD() *big.Int {
self.lock.RLock()
defer self.lock.RUnlock()
return self.td
}
func (self *BlockPool) setTD(td *big.Int) {
self.lock.Lock()
defer self.lock.Unlock()
self.td = td
}
func (self *BlockPool) remove(sec *section) {
// delete node entries from pool index under pool lock
self.lock.Lock()
defer self.lock.Unlock()
for _, node := range sec.nodes {
delete(self.pool, node.hash)
}
if sec.initialised && sec.poolRootIndex != 0 {
self.status.lock.Lock()
self.status.values.BlocksInPool -= len(sec.nodes) - sec.missing
self.status.lock.Unlock()
}
}
// get/put for optimised allocation similar to sync.Pool
func (self *BlockPool) getHashSlice() (s []common.Hash) {
select {
case s = <-self.hashSlicePool:
default:
s = make([]common.Hash, self.Config.BlockBatchSize)
}
return
}
func (self *BlockPool) putHashSlice(s []common.Hash) {
if len(s) == self.Config.BlockBatchSize {
select {
case self.hashSlicePool <- s:
default:
}
}
}
// pretty prints hash (byte array) with first 4 bytes in hex
func hex(hash common.Hash) (name string) {
if (hash == common.Hash{}) {
name = ""
} else {
name = fmt.Sprintf("%x", hash[:4])
}
return
}
// pretty prints a section using first 4 bytes in hex of bottom and top blockhash of the section
func sectionhex(section *section) (name string) {
if section == nil {
name = ""
} else {
name = fmt.Sprintf("%x-%x", section.bottom.hash[:4], section.top.hash[:4])
}
return
}

View File

@ -1,433 +0,0 @@
package blockpool
import (
"testing"
"time"
)
// using the mock framework in blockpool_util_test
// we test various scenarios here
func TestPeerWithKnownBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.refBlockChain[0] = nil
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer0 := blockPoolTester.newPeer("0", 1, 0)
peer0.AddPeer()
blockPool.Wait(waitTimeout)
blockPool.Stop()
// no request on known block
peer0.checkBlockHashesRequests()
}
func TestPeerWithKnownParentBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.initRefBlockChain(1)
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer0 := blockPoolTester.newPeer("0", 1, 1)
peer0.AddPeer()
peer0.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
peer0.checkBlocksRequests([]int{1})
peer0.checkBlockHashesRequests()
blockPoolTester.refBlockChain[1] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestSimpleChain(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(2)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 2, 2)
peer1.AddPeer()
peer1.serveBlocks(1, 2)
go peer1.serveBlockHashes(2, 1, 0)
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[2] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestChainConnectingWithParentHash(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 3, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestMultiSectionChain(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(5)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 5, 5)
peer1.AddPeer()
go peer1.serveBlocks(4, 5)
go peer1.serveBlockHashes(5, 4, 3)
go peer1.serveBlocks(2, 3, 4)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[5] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestNewBlocksOnPartialChain(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(7)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 5, 5)
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[5] = 5
peer1.AddPeer()
go peer1.serveBlocks(4, 5) // partially complete section
go peer1.serveBlockHashes(5, 4, 3)
peer1.serveBlocks(3, 4) // partially complete section
// peer1 found new blocks
peer1.td = 7
peer1.currentBlock = 7
peer1.AddPeer()
peer1.sendBlocks(6, 7)
go peer1.serveBlockHashes(7, 6, 5)
go peer1.serveBlocks(2, 3)
go peer1.serveBlocks(5, 6)
go peer1.serveBlockHashes(3, 2, 1) // tests that hash request from known chain root is remembered
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[7] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchUp(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(7)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 6, 6)
peer2 := blockPoolTester.newPeer("peer2", 7, 7)
peer1.AddPeer()
go peer1.serveBlocks(5, 6)
go peer1.serveBlockHashes(6, 5, 4, 3) //
peer1.serveBlocks(2, 3) // section partially complete, block 3 will be preserved after peer demoted
peer2.AddPeer() // peer2 is promoted as best peer, peer1 is demoted
go peer2.serveBlocks(6, 7) //
go peer2.serveBlocks(4, 5) // tests that block request for earlier section is remembered
go peer1.serveBlocks(3, 4) // tests that connecting section by demoted peer is remembered and blocks are accepted from demoted peer
go peer2.serveBlockHashes(3, 2, 1, 0) // tests that known chain section is activated, hash requests from 3 is remembered
peer2.serveBlocks(0, 1, 2) // final blocks linking to blockchain sent
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[7] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownOverlapSectionWithoutRootBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 4, 4)
peer2 := blockPoolTester.newPeer("peer2", 6, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
peer2.serveBlockHashes(6, 5, 4) // no go: make sure skeleton is created
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlockHashes(4, 3, 2, 1, 0) //
go peer1.serveBlocks(3, 4) //
go peer1.serveBlocks(4, 5) // tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownOverlapSectionWithRootBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(6)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 4, 4)
peer2 := blockPoolTester.newPeer("peer2", 6, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
go peer2.serveBlockHashes(6, 5, 4) //
peer2.serveBlocks(3, 4) // !incomplete section
time.Sleep(100 * time.Millisecond) // make sure block 4 added
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlockHashes(4, 3, 2, 1, 0) // tests that hash request are directly connecting if the head block exists
go peer1.serveBlocks(4, 5) // tests that section set by demoted peer is remembered and blocks are accepted from new peer if they have it even if peers original TD is lower
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchDownDisjointSection(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 3, 3)
peer2 := blockPoolTester.newPeer("peer2", 6, 6)
peer2.AddPeer()
peer2.serveBlocks(5, 6) // partially complete, section will be preserved
go peer2.serveBlockHashes(6, 5, 4) //
peer2.serveBlocks(3, 4, 5) //
time.Sleep(100 * time.Millisecond) // make sure blocks are received
peer1.AddPeer() // inferior peer1 is promoted as best peer
blockPool.RemovePeer("peer2") // peer2 disconnects
go peer1.serveBlocks(2, 3) //
go peer1.serveBlockHashes(3, 2, 1) //
peer1.serveBlocks(0, 1, 2) //
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{} // tests that idle sections are not inserted in blockchain
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestPeerSwitchBack(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(8)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 11, 11)
peer2 := blockPoolTester.newPeer("peer2", 8, 8)
peer2.AddPeer()
go peer2.serveBlocks(7, 8)
go peer2.serveBlockHashes(8, 7, 6)
go peer2.serveBlockHashes(6, 5, 4)
peer2.serveBlocks(4, 5) // section partially complete
peer1.AddPeer() // peer1 is promoted as best peer
peer1.serveBlocks(10, 11) //
peer1.serveBlockHashes(11, 10) // only gives useless results
blockPool.RemovePeer("peer1") // peer1 disconnects
go peer2.serveBlockHashes(4, 3, 2, 1, 0) // tests that asking for hashes from 4 is remembered
go peer2.serveBlocks(3, 4, 5, 6, 7, 8) // tests that section 4, 5, 6 and 7, 8 are remembered for missing blocks
peer2.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[8] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSimple(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[6] = 10
peer1 := blockPoolTester.newPeer("peer1", 9, 9)
peer2 := blockPoolTester.newPeer("peer2", 10, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7, 3, 2)
peer1.serveBlocks(1, 2, 3, 7, 8)
peer2.AddPeer() // peer2 is promoted as best peer
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // fork on 3 -> 4 (earlier child: 7)
go peer2.serveBlocks(1, 2, 3, 4, 5)
go peer2.serveBlockHashes(2, 1, 0)
peer2.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[6] = []int{}
blockPoolTester.refBlockChain[3] = []int{4}
delete(blockPoolTester.refBlockChain, 7)
delete(blockPoolTester.refBlockChain, 8)
delete(blockPoolTester.refBlockChain, 9)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSwitchBackByNewBlocks(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(11)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[6] = 10
peer1 := blockPoolTester.newPeer("peer1", 9, 9)
peer2 := blockPoolTester.newPeer("peer2", 10, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9) //
go peer1.serveBlockHashes(9, 8, 7, 3, 2) //
peer1.serveBlocks(7, 8) // partial section
// time.Sleep(1 * time.Second)
peer2.AddPeer() //
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(1, 2, 3, 4, 5) //
// peer1 finds new blocks
peer1.td = 11
peer1.currentBlock = 11
peer1.AddPeer()
go peer1.serveBlocks(10, 11)
go peer1.serveBlockHashes(11, 10, 9)
go peer1.serveBlocks(9, 10)
// time.Sleep(1 * time.Second)
go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered
go peer1.serveBlockHashes(2, 1, 0) // tests that hash request from root of connecting chain section (added by demoted peer) is remembered
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[11] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkSwitchBackByPeerSwitchBack(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[6] = 10
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[6] = 10
peer1 := blockPoolTester.newPeer("peer1", 9, 9)
peer2 := blockPoolTester.newPeer("peer2", 10, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7, 3, 2)
peer1.serveBlocks(7, 8)
peer2.AddPeer()
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
peer2.serveBlocks(2, 3, 4, 5) //
blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer
go peer1.serveBlocks(1, 2) //
go peer1.serveBlockHashes(2, 1, 0) //
go peer1.serveBlocks(3, 7) // tests that block requests on earlier fork are remembered and orphan section relinks to existing parent block
peer1.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[9] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}
func TestForkCompleteSectionSwitchBackByPeerSwitchBack(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(9)
blockPoolTester.refBlockChain[3] = []int{4, 7}
delete(blockPoolTester.refBlockChain, 6)
blockPool.Start()
blockPoolTester.tds = make(map[int]int)
blockPoolTester.tds[6] = 10
peer1 := blockPoolTester.newPeer("peer1", 9, 9)
peer2 := blockPoolTester.newPeer("peer2", 10, 6)
peer1.AddPeer()
go peer1.serveBlocks(8, 9)
go peer1.serveBlockHashes(9, 8, 7)
peer1.serveBlocks(3, 7, 8) // make sure this section is complete
// time.Sleep(2 * time.Second) //
peer1.serveBlockHashes(7, 3, 2) // block 3/7 is section boundary
peer1.serveBlocks(2, 3) // partially complete sections block 2 missing
peer2.AddPeer() //
go peer2.serveBlocks(5, 6) //
go peer2.serveBlockHashes(6, 5, 4, 3, 2) // peer2 forks on block 3
time.Sleep(100 * time.Millisecond) //
peer2.serveBlocks(2, 3, 4, 5) // block 2 still missing.
blockPool.RemovePeer("peer2") // peer2 disconnects, peer1 is promoted again as best peer
go peer1.serveBlockHashes(2, 1) //
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[9] = []int{}
blockPoolTester.refBlockChain[3] = []int{7}
delete(blockPoolTester.refBlockChain, 6)
delete(blockPoolTester.refBlockChain, 5)
delete(blockPoolTester.refBlockChain, 4)
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}

View File

@ -1,373 +0,0 @@
package blockpool
import (
"fmt"
"math/big"
"sync"
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/event"
"github.com/ethereum/go-ethereum/pow"
)
var (
waitTimeout = 60 * time.Second
testBlockHashesRequestInterval = 10 * time.Millisecond
testBlocksRequestInterval = 10 * time.Millisecond
requestWatchInterval = 10 * time.Millisecond
)
// test blockChain is an integer trie
type blockChain map[int][]int
// blockPoolTester provides the interface between tests and a blockPool
//
// refBlockChain is used to guide which blocks will be accepted as valid
// blockChain gives the current state of the blockchain and
// accumulates inserts so that we can check the resulting chain
type blockPoolTester struct {
hashPool *test.TestHashPool
lock sync.RWMutex
reqlock sync.RWMutex
blocksRequestsMap map[int]bool
refBlockChain blockChain
blockChain blockChain
blockPool *BlockPool
t *testing.T
chainEvents *event.TypeMux
tds map[int]int
}
func newTestBlockPool(t *testing.T) (hashPool *test.TestHashPool, blockPool *BlockPool, b *blockPoolTester) {
hashPool = test.NewHashPool()
b = &blockPoolTester{
t: t,
hashPool: hashPool,
blockChain: make(blockChain),
refBlockChain: make(blockChain),
blocksRequestsMap: make(map[int]bool),
chainEvents: &event.TypeMux{},
}
b.blockPool = New(b.hasBlock, b.insertChain, b.verifyPoW, b.chainEvents, common.Big0)
blockPool = b.blockPool
blockPool.Config.BlockHashesRequestInterval = testBlockHashesRequestInterval
blockPool.Config.BlocksRequestInterval = testBlocksRequestInterval
return
}
func (self *blockPoolTester) Errorf(format string, params ...interface{}) {
// fmt.Printf(format+"\n", params...)
self.t.Errorf(format, params...)
}
// blockPoolTester implements the 3 callbacks needed by the blockPool:
// hasBlock, insetChain, verifyPoW as well as provides the eventer
// to subscribe to head insertions
func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) {
self.lock.RLock()
defer self.lock.RUnlock()
indexes := self.hashPool.HashesToIndexes([]common.Hash{block})
i := indexes[0]
_, ok = self.blockChain[i]
// fmt.Printf("has block %v (%x...): %v\n", i, block[0:4], ok)
return
}
// mock insertChain relies on refBlockChain to determine block validity
func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
self.lock.Lock()
defer self.lock.Unlock()
var parent, child int
var children, refChildren []int
var ok bool
for _, block := range blocks {
child = self.hashPool.HashesToIndexes([]common.Hash{block.Hash()})[0]
td := child
if self.tds != nil {
td, ok = self.tds[child]
}
if !ok {
td = child
}
block.Td = big.NewInt(int64(td))
_, ok = self.blockChain[child]
if ok {
// fmt.Printf("block %v already in blockchain\n", child)
continue // already in chain
}
parent = self.hashPool.HashesToIndexes([]common.Hash{block.ParentHeaderHash})[0]
children, ok = self.blockChain[parent]
if !ok {
return fmt.Errorf("parent %v not in blockchain ", parent)
}
ok = false
var found bool
refChildren, found = self.refBlockChain[parent]
if found {
for _, c := range refChildren {
if c == child {
ok = true
}
}
if !ok {
return fmt.Errorf("invalid block %v", child)
}
} else {
ok = true
}
if ok {
// accept any blocks if parent not in refBlockChain
self.blockChain[parent] = append(children, child)
self.blockChain[child] = nil
}
}
return nil
}
// mock soft block validation always succeeds
func (self *blockPoolTester) verifyPoW(pblock pow.Block) bool {
return true
}
// test helper that compares the resulting blockChain to the desired blockChain
func (self *blockPoolTester) checkBlockChain(blockChain map[int][]int) {
self.lock.RLock()
defer self.lock.RUnlock()
// for k, v := range self.blockChain {
// fmt.Printf("got: %v -> %v\n", k, v)
// }
// for k, v := range blockChain {
// fmt.Printf("expected: %v -> %v\n", k, v)
// }
if len(blockChain) != len(self.blockChain) {
self.Errorf("blockchain incorrect (zlength differ)")
}
for k, v := range blockChain {
vv, ok := self.blockChain[k]
if !ok || !test.ArrayEq(v, vv) {
self.Errorf("blockchain incorrect on %v -> %v (!= %v)", k, vv, v)
}
}
}
// peerTester provides the peer callbacks for the blockPool
// it registers actual callbacks so that the result can be compared to desired behaviour
// provides helper functions to mock the protocol calls to the blockPool
type peerTester struct {
// containers to record request and error callbacks
blockHashesRequests []int
blocksRequests [][]int
blocksRequestsMap map[int]bool
peerErrors []int
blockPool *BlockPool
hashPool *test.TestHashPool
lock sync.RWMutex
bt *blockPoolTester
id string
td int
currentBlock int
t *testing.T
}
// peerTester constructor takes hashPool and blockPool from the blockPoolTester
func (self *blockPoolTester) newPeer(id string, td int, cb int) *peerTester {
return &peerTester{
id: id,
td: td,
currentBlock: cb,
hashPool: self.hashPool,
blockPool: self.blockPool,
t: self.t,
bt: self,
blocksRequestsMap: self.blocksRequestsMap,
}
}
func (self *peerTester) Errorf(format string, params ...interface{}) {
// fmt.Printf(format+"\n", params...)
self.t.Errorf(format, params...)
}
// helper to compare actual and expected block requests
func (self *peerTester) checkBlocksRequests(blocksRequests ...[]int) {
if len(blocksRequests) > len(self.blocksRequests) {
self.Errorf("blocks requests incorrect (length differ)\ngot %v\nexpected %v", self.blocksRequests, blocksRequests)
} else {
for i, rr := range blocksRequests {
r := self.blocksRequests[i]
if !test.ArrayEq(r, rr) {
self.Errorf("blocks requests incorrect\ngot %v\nexpected %v", self.blocksRequests, blocksRequests)
}
}
}
}
// helper to compare actual and expected block hash requests
func (self *peerTester) checkBlockHashesRequests(blocksHashesRequests ...int) {
rr := blocksHashesRequests
self.lock.RLock()
r := self.blockHashesRequests
self.lock.RUnlock()
if len(r) != len(rr) {
self.Errorf("block hashes requests incorrect (length differ)\ngot %v\nexpected %v", r, rr)
} else {
if !test.ArrayEq(r, rr) {
self.Errorf("block hashes requests incorrect\ngot %v\nexpected %v", r, rr)
}
}
}
// waiter function used by peer.serveBlocks
// blocking until requests appear
// this mocks proper wire protocol behaviour
// since block requests are sent to any random peers
// block request map is shared between peers
// times out after waitTimeout
func (self *peerTester) waitBlocksRequests(blocksRequest ...int) {
timeout := time.After(waitTimeout)
rr := blocksRequest
for {
self.lock.RLock()
r := self.blocksRequestsMap
// fmt.Printf("[%s] blocks request check %v (%v)\n", self.id, rr, r)
i := 0
for i = 0; i < len(rr); i++ {
_, ok := r[rr[i]]
if !ok {
break
}
}
self.lock.RUnlock()
if i == len(rr) {
return
}
time.Sleep(requestWatchInterval)
select {
case <-timeout:
default:
}
}
}
// waiter function used by peer.serveBlockHashes
// blocking until requests appear
// this mocks proper wire protocol behaviour
// times out after a period
func (self *peerTester) waitBlockHashesRequests(blocksHashesRequest int) {
timeout := time.After(waitTimeout)
rr := blocksHashesRequest
for i := 0; ; {
self.lock.RLock()
r := self.blockHashesRequests
self.lock.RUnlock()
// fmt.Printf("[%s] block hash request check %v (%v)\n", self.id, rr, r)
for ; i < len(r); i++ {
if rr == r[i] {
return
}
}
time.Sleep(requestWatchInterval)
select {
case <-timeout:
default:
}
}
}
// mocks a simple blockchain 0 (genesis) ... n (head)
func (self *blockPoolTester) initRefBlockChain(n int) {
for i := 0; i < n; i++ {
self.refBlockChain[i] = []int{i + 1}
}
}
// peerTester functions that mimic protocol calls to the blockpool
// registers the peer with the blockPool
func (self *peerTester) AddPeer() (best bool) {
hash := self.hashPool.IndexesToHashes([]int{self.currentBlock})[0]
best, _ = self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
return
}
// peer sends blockhashes if and when gets a request
func (self *peerTester) serveBlockHashes(indexes ...int) {
// fmt.Printf("ready to serve block hashes %v\n", indexes)
self.waitBlockHashesRequests(indexes[0])
self.sendBlockHashes(indexes...)
}
// peer sends blockhashes not waiting for request
func (self *peerTester) sendBlockHashes(indexes ...int) {
// fmt.Printf("adding block hashes %v\n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
i := 1
next := func() (hash common.Hash, ok bool) {
if i < len(hashes) {
hash = hashes[i]
ok = true
i++
}
return
}
self.blockPool.AddBlockHashes(next, self.id)
}
// peer sends blocks if and when there is a request
// (in the shared request store, not necessarily to a specific peer)
func (self *peerTester) serveBlocks(indexes ...int) {
// fmt.Printf("ready to serve blocks %v\n", indexes[1:])
self.waitBlocksRequests(indexes[1:]...)
self.sendBlocks(indexes...)
}
// peer sends blocks not waiting for request
func (self *peerTester) sendBlocks(indexes ...int) {
// fmt.Printf("adding blocks %v \n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
for i := 1; i < len(hashes); i++ {
// fmt.Printf("adding block %v %x\n", indexes[i], hashes[i][:4])
self.blockPool.AddBlock(&types.Block{HeaderHash: hashes[i], ParentHeaderHash: hashes[i-1]}, self.id)
}
}
// the 3 mock peer callbacks
// records block hashes requests by the blockPool
// -1 is special: not found (a hash never seen)
func (self *peerTester) requestBlockHashes(hash common.Hash) error {
indexes := self.hashPool.HashesToIndexes([]common.Hash{hash})
// fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4])
self.lock.Lock()
defer self.lock.Unlock()
self.blockHashesRequests = append(self.blockHashesRequests, indexes[0])
return nil
}
// records block requests by the blockPool
func (self *peerTester) requestBlocks(hashes []common.Hash) error {
indexes := self.hashPool.HashesToIndexes(hashes)
// fmt.Printf("blocks request %v %x...\n", indexes, hashes[0][:4])
self.bt.reqlock.Lock()
defer self.bt.reqlock.Unlock()
self.blocksRequests = append(self.blocksRequests, indexes)
for _, i := range indexes {
self.blocksRequestsMap[i] = true
}
return nil
}
// records the error codes of all the peerErrors found the blockPool
func (self *peerTester) peerError(err *errs.Error) {
self.peerErrors = append(self.peerErrors, err.Code)
if err.Fatal() {
self.blockPool.RemovePeer(self.id)
}
}

View File

@ -1,49 +0,0 @@
package blockpool
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/event"
)
func TestBlockPoolConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}}
blockPool.Start()
c := blockPool.Config
test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, blockHashesBatchSize, t)
test.CheckInt("BlockBatchSize", c.BlockBatchSize, blockBatchSize, t)
test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t)
test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t)
test.CheckInt("NodeCacheSize", c.NodeCacheSize, nodeCacheSize, t)
test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, blockHashesRequestInterval, t)
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, blocksRequestInterval, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t)
test.CheckDuration("StatusUpdateInterval", c.StatusUpdateInterval, statusUpdateInterval, t)
}
func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}, chainEvents: &event.TypeMux{}}
c := &Config{128, 32, 1, 0, 500, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second, 4 * time.Second}
blockPool.Config = c
blockPool.Start()
test.CheckInt("BlockHashesBatchSize", c.BlockHashesBatchSize, 128, t)
test.CheckInt("BlockBatchSize", c.BlockBatchSize, 32, t)
test.CheckInt("BlocksRequestRepetition", c.BlocksRequestRepetition, blocksRequestRepetition, t)
test.CheckInt("BlocksRequestMaxIdleRounds", c.BlocksRequestMaxIdleRounds, blocksRequestMaxIdleRounds, t)
test.CheckInt("NodeCacheSize", c.NodeCacheSize, 500, t)
test.CheckDuration("BlockHashesRequestInterval", c.BlockHashesRequestInterval, 300*time.Millisecond, t)
test.CheckDuration("BlocksRequestInterval", c.BlocksRequestInterval, 100*time.Millisecond, t)
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t)
test.CheckDuration("StatusUpdateInterval", c.StatusUpdateInterval, 4*time.Second, t)
}

View File

@ -1,224 +0,0 @@
package blockpool
import (
"testing"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow"
)
func TestInvalidBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(2)
blockPoolTester.refBlockChain[2] = []int{}
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[2] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInvalidBlock {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrInvalidBlock)
}
} else {
t.Errorf("expected %v error, got %v", ErrInvalidBlock, peer1.peerErrors)
}
}
func TestVerifyPoW(t *testing.T) {
t.Skip() // :FIXME:
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
first := false
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
bb, _ := b.(*types.Block)
indexes := blockPoolTester.hashPool.HashesToIndexes([]common.Hash{bb.Hash()})
if indexes[0] == 2 && !first {
first = true
return false
} else {
return true
}
}
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer2 := blockPoolTester.newPeer("peer2", 1, 3)
peer1.AddPeer()
peer2.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2, 3)
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
return true
}
peer2.serveBlocks(1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInvalidPoW {
t.Errorf("wrong error, expected %v, got %v", ErrInvalidPoW, peer1.peerErrors[0])
}
} else {
t.Errorf("expected %v error, got %v", ErrInvalidPoW, peer1.peerErrors)
}
}
func TestUnrequestedBlock(t *testing.T) {
t.Skip() // :FIXME:
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
peer1.sendBlocks(1, 2)
blockPool.Stop()
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrUnrequestedBlock {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrUnrequestedBlock)
}
} else {
t.Errorf("expected %v error, got %v", ErrUnrequestedBlock, peer1.peerErrors)
}
}
func TestErrInsufficientChainInfo(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPool.Config.BlockHashesTimeout = 100 * time.Millisecond
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
blockPool.Wait(waitTimeout)
blockPool.Stop()
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrInsufficientChainInfo {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrInsufficientChainInfo)
}
} else {
t.Errorf("expected %v error, got %v", ErrInsufficientChainInfo, peer1.peerErrors)
}
}
func TestIncorrectTD(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) == 1 {
if peer1.peerErrors[0] != ErrIncorrectTD {
t.Errorf("wrong error, got %v, expected %v", peer1.peerErrors[0], ErrIncorrectTD)
}
} else {
t.Errorf("expected %v error, got %v", ErrIncorrectTD, peer1.peerErrors)
}
}
func TestSkipIncorrectTDonFutureBlocks(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(3)
blockPool.insertChain = func(blocks types.Blocks) error {
err := blockPoolTester.insertChain(blocks)
if err == nil {
for _, block := range blocks {
if block.Td.Cmp(common.Big3) == 0 {
block.Td = common.Big3
block.SetQueued(true)
break
}
}
}
return err
}
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 3, 3)
peer1.AddPeer()
go peer1.serveBlocks(2, 3)
go peer1.serveBlockHashes(3, 2, 1, 0)
peer1.serveBlocks(0, 1, 2)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[3] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
if len(peer1.peerErrors) > 0 {
t.Errorf("expected no error, got %v (1 of %v)", peer1.peerErrors[0], len(peer1.peerErrors))
}
}
func TestPeerSuspension(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPool.Config.PeerSuspensionInterval = 100 * time.Millisecond
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 3, 3)
peer1.AddPeer()
bestpeer, _ := blockPool.peers.getPeer("peer1")
if bestpeer == nil {
t.Errorf("peer1 not best peer")
return
}
peer1.serveBlocks(2, 3)
blockPool.peers.peerError("peer1", 0, "")
bestpeer, _ = blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed on error")
return
}
peer1.AddPeer()
bestpeer, _ = blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed on reconnect")
return
}
time.Sleep(100 * time.Millisecond)
peer1.AddPeer()
bestpeer, _ = blockPool.peers.getPeer("peer1")
if bestpeer == nil {
t.Errorf("peer1 not connected after PeerSuspensionInterval")
return
}
blockPool.Stop()
}

View File

@ -1,639 +0,0 @@
package blockpool
import (
"math/big"
"math/rand"
"sort"
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
// the blockpool's model of a peer
type peer struct {
lock sync.RWMutex
// last known blockchain status
td *big.Int
tdAdvertised bool
currentBlockHash common.Hash
currentBlock *types.Block
parentHash common.Hash
headSection *section
id string
// peer callbacks
requestBlockHashes func(common.Hash) error
requestBlocks func([]common.Hash) error
peerError func(*errs.Error)
errors *errs.Errors
sections []common.Hash
// channels to push new head block and head section for peer a
currentBlockC chan *types.Block
headSectionC chan *section
// channels to signal peer switch and peer quit to section processes
idleC chan bool
switchC chan bool
bp *BlockPool
// timers for head section process
blockHashesRequestTimer <-chan time.Time
blocksRequestTimer <-chan time.Time
headInfoTimer <-chan time.Time
bestIdleTimer <-chan time.Time
addToBlacklist func(id string)
idle bool
}
// peers is the component keeping a record of peers in a hashmap
//
type peers struct {
lock sync.RWMutex
bllock sync.Mutex
bp *BlockPool
errors *errs.Errors
peers map[string]*peer
best *peer
status *status
blacklist map[string]time.Time
}
// peer constructor
func (self *peers) newPeer(
td *big.Int,
currentBlockHash common.Hash,
id string,
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (p *peer) {
p = &peer{
errors: self.errors,
td: td,
currentBlockHash: currentBlockHash,
id: id,
requestBlockHashes: requestBlockHashes,
requestBlocks: requestBlocks,
peerError: peerError,
currentBlockC: make(chan *types.Block),
headSectionC: make(chan *section),
switchC: make(chan bool),
bp: self.bp,
idle: true,
addToBlacklist: self.addToBlacklist,
}
close(p.switchC) //! hack :((((
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
// dispatches an error to a peer if still connected, adds it to the blacklist
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
peer, ok := self.peers[id]
self.lock.RUnlock()
if ok {
peer.addError(code, format, params...)
} else {
self.addToBlacklist(id)
}
}
// record time of offence in blacklist to implement suspension for PeerSuspensionInterval
func (self *peers) addToBlacklist(id string) {
self.bllock.Lock()
defer self.bllock.Unlock()
self.blacklist[id] = time.Now()
}
// suspended checks if peer is still suspended, caller should hold peers.lock
func (self *peers) suspended(id string) (s bool) {
self.bllock.Lock()
defer self.bllock.Unlock()
if suspendedAt, ok := self.blacklist[id]; ok {
if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
// no longer suspended, delete entry
delete(self.blacklist, id)
}
}
return
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
if err.Fatal() {
self.addToBlacklist(self.id)
} else {
go self.bp.peers.removePeer(self.id, false)
}
}
// caller must hold peer lock
func (self *peer) setChainInfo(td *big.Int, currentBlockHash common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
if self.currentBlockHash != currentBlockHash {
previousBlockHash := self.currentBlockHash
glog.V(logger.Debug).Infof("addPeer: Update peer <%s> with td %v (was %v) and current block %s (was %v)", self.id, td, self.td, hex(currentBlockHash), hex(previousBlockHash))
self.td = td
self.currentBlockHash = currentBlockHash
self.currentBlock = nil
self.parentHash = common.Hash{}
self.headSection = nil
}
self.tdAdvertised = true
}
func (self *peer) setChainInfoFromBlock(block *types.Block) (td *big.Int, currentBlockHash common.Hash) {
hash := block.Hash()
// this happens when block came in a newblock message but
// also if sent in a blockmsg (for instance, if we requested, only if we
// dont apply on blockrequests the restriction of flood control)
currentBlockHash = self.currentBlockHash
if currentBlockHash == hash {
if self.currentBlock == nil {
// signal to head section process
glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) received\n", hex(hash), self.id, hex(currentBlockHash))
td = self.td
} else {
glog.V(logger.Detail).Infof("AddBlock: head block %s for peer <%s> (head: %s) already known", hex(hash), self.id, hex(currentBlockHash))
}
}
return
}
// this will use the TD given by the first peer to update peer td, this helps second best peer selection
func (self *peer) setChainInfoFromNode(n *node) {
// in case best peer is lost
block := n.block
hash := block.Hash()
if n.td != nil && n.td.Cmp(self.td) > 0 {
glog.V(logger.Detail).Infof("AddBlock: update peer <%s> - head: %v->%v - TD: %v->%v", self.id, hex(self.currentBlockHash), hex(hash), self.td, n.td)
self.td = n.td
self.currentBlockHash = block.Hash()
self.parentHash = block.ParentHash()
self.currentBlock = block
self.headSection = nil
}
}
// distribute block request among known peers
func (self *peers) requestBlocks(attempts int, hashes []common.Hash) {
self.lock.RLock()
defer self.lock.RUnlock()
peerCount := len(self.peers)
// on first attempt use the best peer
if attempts == 0 && self.best != nil {
glog.V(logger.Detail).Infof("request %v missing blocks from best peer <%s>", len(hashes), self.best.id)
self.best.requestBlocks(hashes)
return
}
repetitions := self.bp.Config.BlocksRequestRepetition
if repetitions > peerCount {
repetitions = peerCount
}
i := 0
indexes := rand.Perm(peerCount)[0:repetitions]
sort.Ints(indexes)
glog.V(logger.Detail).Infof("request %v missing blocks from %v/%v peers", len(hashes), repetitions, peerCount)
for _, peer := range self.peers {
if i == indexes[0] {
glog.V(logger.Detail).Infof("request length: %v", len(hashes))
glog.V(logger.Detail).Infof("request %v missing blocks [%x/%x] from peer <%s>", len(hashes), hashes[0][:4], hashes[len(hashes)-1][:4], peer.id)
peer.requestBlocks(hashes)
indexes = indexes[1:]
if len(indexes) == 0 {
break
}
}
i++
}
self.bp.putHashSlice(hashes)
}
// addPeer implements the logic for blockpool.AddPeer
// returns 2 bool values
// 1. true iff peer is promoted as best peer in the pool
// 2. true iff peer is still suspended
func (self *peers) addPeer(
td *big.Int,
currentBlockHash common.Hash,
id string,
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (best bool, suspended bool) {
self.lock.Lock()
defer self.lock.Unlock()
var previousBlockHash common.Hash
if self.suspended(id) {
suspended = true
return
}
p, found := self.peers[id]
if found {
// when called on an already connected peer, it means a newBlockMsg is received
// peer head info is updated
p.setChainInfo(td, currentBlockHash)
self.status.lock.Lock()
self.status.values.NewBlocks++
self.status.lock.Unlock()
} else {
p = self.newPeer(td, currentBlockHash, id, requestBlockHashes, requestBlocks, peerError)
self.status.lock.Lock()
self.status.peers[id]++
self.status.values.NewBlocks++
self.status.lock.Unlock()
glog.V(logger.Debug).Infof("addPeer: add new peer <%v> with td %v and current block %s", id, td, hex(currentBlockHash))
}
// check if peer's current head block is known
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
glog.V(logger.Debug).Infof("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
return false, false
}
if self.best == p {
// new block update for active current best peer -> request hashes
glog.V(logger.Debug).Infof("addPeer: <%s> already the best peer. Request new head section info from %s", id, hex(currentBlockHash))
if (previousBlockHash != common.Hash{}) {
glog.V(logger.Detail).Infof("addPeer: <%s> head changed: %s -> %s ", id, hex(previousBlockHash), hex(currentBlockHash))
p.headSectionC <- nil
if entry := self.bp.get(previousBlockHash); entry != nil {
glog.V(logger.Detail).Infof("addPeer: <%s> previous head : %v found in pool, activate", id, hex(previousBlockHash))
self.bp.activateChain(entry.section, p, p.switchC, nil)
p.sections = append(p.sections, previousBlockHash)
}
}
best = true
} else {
// baseline is our own TD
currentTD := self.bp.getTD()
bestpeer := self.best
if bestpeer != nil {
bestpeer.lock.RLock()
defer bestpeer.lock.RUnlock()
currentTD = self.best.td
}
if td.Cmp(currentTD) > 0 {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v > current td %v) promoted best peer", id, td, currentTD)
// fmt.Printf("best peer %v - \n", bestpeer, id)
self.bp.switchPeer(bestpeer, p)
self.best = p
best = true
}
}
return
}
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string, del bool) {
self.lock.Lock()
defer self.lock.Unlock()
p, found := self.peers[id]
if !found {
return
}
p.lock.Lock()
defer p.lock.Unlock()
if del {
delete(self.peers, id)
glog.V(logger.Debug).Infof("addPeer: remove peer <%v> (td: %v)", id, p.td)
}
// if current best peer is removed, need to find a better one
if self.best == p {
var newp *peer
// only peers that are ahead of us are considered
max := self.bp.getTD()
// peer with the highest self-acclaimed TD is chosen
for _, pp := range self.peers {
// demoted peer's td should be 0
if pp.id == id {
pp.td = common.Big0
pp.currentBlockHash = common.Hash{}
continue
}
pp.lock.RLock()
if pp.td.Cmp(max) > 0 {
max = pp.td
newp = pp
}
pp.lock.RUnlock()
}
if newp != nil {
self.status.lock.Lock()
self.status.bestPeers[p.id]++
self.status.lock.Unlock()
glog.V(logger.Debug).Infof("addPeer: peer <%v> (td: %v) promoted best peer", newp.id, newp.td)
} else {
glog.V(logger.Warn).Infof("addPeer: no suitable peers found")
}
self.best = newp
// fmt.Printf("remove peer %v - %v\n", p.id, newp)
self.bp.switchPeer(p, newp)
}
}
// switchPeer launches section processes
func (self *BlockPool) switchPeer(oldp, newp *peer) {
// first quit AddBlockHashes, requestHeadSection and activateChain
// by closing the old peer's switchC channel
if oldp != nil {
glog.V(logger.Detail).Infof("<%s> quit peer processes", oldp.id)
// fmt.Printf("close %v - %v\n", oldp.id, newp)
close(oldp.switchC)
}
if newp != nil {
// if new best peer has no head section yet, create it and run it
// otherwise head section is an element of peer.sections
newp.idleC = make(chan bool)
newp.switchC = make(chan bool)
if newp.headSection == nil {
glog.V(logger.Detail).Infof("[%s] head section for [%s] not created, requesting info", newp.id, hex(newp.currentBlockHash))
if newp.idle {
self.wg.Add(1)
newp.idle = false
self.syncing()
}
go func() {
newp.run()
if !newp.idle {
self.wg.Done()
newp.idle = true
}
}()
}
var connected = make(map[common.Hash]*section)
var sections []common.Hash
for _, hash := range newp.sections {
glog.V(logger.Detail).Infof("activate chain starting from section [%s]", hex(hash))
// if section not connected (ie, top of a contiguous sequence of sections)
if connected[hash] == nil {
// if not deleted, then reread from pool (it can be orphaned top half of a split section)
if entry := self.get(hash); entry != nil {
self.activateChain(entry.section, newp, newp.switchC, connected)
connected[hash] = entry.section
sections = append(sections, hash)
}
}
}
glog.V(logger.Detail).Infof("<%s> section processes (%v non-contiguous sequences, was %v before)", newp.id, len(sections), len(newp.sections))
// need to lock now that newp is exposed to section processesr
newp.lock.Lock()
newp.sections = sections
newp.lock.Unlock()
}
// finally deactivate section process for sections where newp didnt activate
// newp activating section process changes the quit channel for this reason
if oldp != nil {
glog.V(logger.Detail).Infof("<%s> quit section processes", oldp.id)
close(oldp.idleC)
}
}
// getPeer looks up peer by id, returns peer and a bool value
// that is true iff peer is current best peer
func (self *peers) getPeer(id string) (p *peer, best bool) {
self.lock.RLock()
defer self.lock.RUnlock()
if self.best != nil && self.best.id == id {
return self.best, true
}
p = self.peers[id]
return
}
// head section process
func (self *peer) handleSection(sec *section) {
self.lock.Lock()
defer self.lock.Unlock()
glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section received [%s]-[%s]", self.id, hex(self.currentBlockHash), sectionhex(self.headSection), sectionhex(sec))
self.headSection = sec
self.blockHashesRequestTimer = nil
if sec == nil {
if self.idle {
self.idle = false
self.bp.wg.Add(1)
self.bp.syncing()
}
self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
self.bestIdleTimer = nil
glog.V(logger.Detail).Infof("HeadSection: <%s> head block hash changed (mined block received). New head %s", self.id, hex(self.currentBlockHash))
} else {
if !self.idle {
self.idle = true
self.bp.wg.Done()
}
self.headInfoTimer = nil
self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout)
glog.V(logger.Detail).Infof("HeadSection: <%s> (head: %s) head section [%s] created. Idle...", self.id, hex(self.currentBlockHash), sectionhex(sec))
}
}
func (self *peer) getCurrentBlock(currentBlock *types.Block) {
// called by update or after AddBlock signals that head block of current peer is received
self.lock.Lock()
defer self.lock.Unlock()
if currentBlock == nil {
if entry := self.bp.get(self.currentBlockHash); entry != nil {
entry.node.lock.Lock()
currentBlock = entry.node.block
entry.node.lock.Unlock()
}
if currentBlock != nil {
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found in blockpool", self.id, hex(self.currentBlockHash))
} else {
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s not found... requesting it", self.id, hex(self.currentBlockHash))
self.requestBlocks([]common.Hash{self.currentBlockHash})
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
return
}
} else {
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s received (parent: %s)", self.id, hex(self.currentBlockHash), hex(currentBlock.ParentHash()))
}
self.currentBlock = currentBlock
self.parentHash = currentBlock.ParentHash()
glog.V(logger.Detail).Infof("HeadSection: <%s> head block %s found (parent: %s)... requesting hashes", self.id, hex(self.currentBlockHash), hex(self.parentHash))
self.blockHashesRequestTimer = time.After(0)
self.blocksRequestTimer = nil
}
func (self *peer) getBlockHashes() bool {
self.lock.Lock()
defer self.lock.Unlock()
//if connecting parent is found
if self.bp.hasBlock(self.parentHash) {
glog.V(logger.Detail).Infof("HeadSection: <%s> parent block %s found in blockchain", self.id, hex(self.parentHash))
err := self.bp.insertChain(types.Blocks([]*types.Block{self.currentBlock}))
self.bp.status.lock.Lock()
self.bp.status.values.BlocksInChain++
self.bp.status.values.BlocksInPool--
if err != nil {
self.addError(ErrInvalidBlock, "%v", err)
self.bp.status.badPeers[self.id]++
} else {
// XXX added currentBlock check (?)
if self.currentBlock != nil && self.currentBlock.Td != nil && !self.currentBlock.Queued() {
glog.V(logger.Detail).Infof("HeadSection: <%s> inserted %s to blockchain... check TD %v =?= %v", self.id, hex(self.parentHash), self.td, self.currentBlock.Td)
if self.td.Cmp(self.currentBlock.Td) != 0 {
self.addError(ErrIncorrectTD, "on block %x %v =?= %v", hex(self.parentHash), self.td, self.currentBlock.Td)
self.bp.status.badPeers[self.id]++
}
}
headKey := self.parentHash
height := self.bp.status.chain[headKey] + 1
self.bp.status.chain[self.currentBlockHash] = height
if height > self.bp.status.values.LongestChain {
self.bp.status.values.LongestChain = height
}
delete(self.bp.status.chain, headKey)
}
self.bp.status.lock.Unlock()
} else {
if parent := self.bp.get(self.parentHash); parent != nil {
if self.bp.get(self.currentBlockHash) == nil {
glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool... creating singleton section", self.id, hex(self.parentHash))
self.bp.nodeCacheLock.Lock()
n, ok := self.bp.nodeCache[self.currentBlockHash]
if !ok {
panic("not found in nodeCache")
}
self.bp.nodeCacheLock.Unlock()
self.bp.newSection([]*node{n}).activate(self)
} else {
glog.V(logger.Detail).Infof("HeadSection: <%s> connecting parent %s found in pool...head section [%s] exists...not requesting hashes", self.id, hex(self.parentHash), sectionhex(parent.section))
self.bp.activateChain(parent.section, self, self.switchC, nil)
}
} else {
glog.V(logger.Detail).Infof("HeadSection: <%s> section [%s] requestBlockHashes", self.id, sectionhex(self.headSection))
self.requestBlockHashes(self.currentBlockHash)
self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
return false
}
}
self.blockHashesRequestTimer = nil
if !self.idle {
self.idle = true
self.headInfoTimer = nil
self.bestIdleTimer = time.After(self.bp.Config.IdleBestPeerTimeout)
self.bp.wg.Done()
}
return true
}
// main loop for head section process
func (self *peer) run() {
self.blocksRequestTimer = time.After(0)
self.headInfoTimer = time.After(self.bp.Config.BlockHashesTimeout)
self.bestIdleTimer = nil
var ping = time.NewTicker(5 * time.Second)
LOOP:
for {
select {
// to minitor section process behaviour
case <-ping.C:
glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s, idle: %v", self.id, hex(self.currentBlockHash), self.idle)
// signal from AddBlockHashes that head section for current best peer is created
// if sec == nil, it signals that chain info has updated (new block message)
case sec := <-self.headSectionC:
self.handleSection(sec)
// periodic check for block hashes or parent block/section
case <-self.blockHashesRequestTimer:
self.getBlockHashes()
// signal from AddBlock that head block of current best peer has been received
case currentBlock := <-self.currentBlockC:
self.getCurrentBlock(currentBlock)
// keep requesting until found or timed out
case <-self.blocksRequestTimer:
self.getCurrentBlock(nil)
// quitting on timeout
case <-self.headInfoTimer:
self.peerError(self.bp.peers.errors.New(ErrInsufficientChainInfo, "timed out without providing block hashes or head block (td: %v, head: %s)", self.td, hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
// there is no persistence here, so GC will just take care of cleaning up
// signal for peer switch, quit
case <-self.switchC:
var complete = "incomplete "
if self.idle {
complete = "complete"
}
glog.V(logger.Detail).Infof("HeadSection: <%s> section with head %s %s... quit request loop due to peer switch", self.id, hex(self.currentBlockHash), complete)
break LOOP
// global quit for blockpool
case <-self.bp.quit:
break LOOP
// best
case <-self.bestIdleTimer:
self.peerError(self.bp.peers.errors.New(ErrIdleTooLong, "timed out without providing new blocks (td: %v, head: %s)...quitting", self.td, hex(self.currentBlockHash)))
self.bp.status.lock.Lock()
self.bp.status.badPeers[self.id]++
self.bp.status.lock.Unlock()
glog.V(logger.Detail).Infof("HeadSection: <%s> (headsection [%s]) quit channel closed : timed out without providing new blocks...quitting", self.id, sectionhex(self.headSection))
}
}
if !self.idle {
self.idle = true
self.bp.wg.Done()
}
}

View File

@ -1,211 +0,0 @@
package blockpool
import (
"flag"
"math/big"
"testing"
"time"
"github.com/ethereum/go-ethereum/core"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
var (
_ = flag.Set("alsologtostderr", "true")
// _ = flag.Set("log_dir", ".")
_ = flag.Set("v", "5")
)
// the actual tests
func TestAddPeer(t *testing.T) {
glog.V(logger.Error).Infoln("logging...")
hashPool, blockPool, blockPoolTester := newTestBlockPool(t)
peer0 := blockPoolTester.newPeer("peer0", 2, 2)
peer1 := blockPoolTester.newPeer("peer1", 4, 4)
peer2 := blockPoolTester.newPeer("peer2", 6, 6)
var bestpeer *peer
blockPool.Start()
// pool
best := peer0.AddPeer()
if !best {
t.Errorf("peer0 (TD=2) not accepted as best")
return
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("peer0 (TD=2) not set as best")
return
}
peer0.serveBlocks(1, 2)
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=6) not accepted as best")
return
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=6) not set as best")
return
}
peer2.serveBlocks(5, 6)
best = peer1.AddPeer()
if best {
t.Errorf("peer1 (TD=4) accepted as best")
return
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=6) not set any more as best")
return
}
if blockPool.peers.best.td.Cmp(big.NewInt(int64(6))) != 0 {
t.Errorf("peer2 TD=6 not set")
return
}
peer2.td = 8
peer2.currentBlock = 8
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=8) not accepted as best")
return
}
if blockPool.peers.best.id != "peer2" {
t.Errorf("peer2 (TD=8) not set as best")
return
}
if blockPool.peers.best.td.Cmp(big.NewInt(int64(8))) != 0 {
t.Errorf("peer2 TD = 8 not updated")
return
}
peer1.td = 6
peer1.currentBlock = 6
best = peer1.AddPeer()
if best {
t.Errorf("peer1 (TD=6) should not be set as best")
return
}
if blockPool.peers.best.id == "peer1" {
t.Errorf("peer1 (TD=6) should not be set as best")
return
}
bestpeer, best = blockPool.peers.getPeer("peer1")
if bestpeer.td.Cmp(big.NewInt(int64(6))) != 0 {
t.Errorf("peer1 TD=6 should be updated")
return
}
blockPool.RemovePeer("peer2")
bestpeer, best = blockPool.peers.getPeer("peer2")
if bestpeer != nil {
t.Errorf("peer2 not removed")
return
}
if blockPool.peers.best.id != "peer1" {
t.Errorf("existing peer1 (TD=6) should be set as best peer")
return
}
blockPool.RemovePeer("peer1")
bestpeer, best = blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed")
return
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("existing peer0 (TD=2) should be set as best peer")
return
}
blockPool.RemovePeer("peer0")
bestpeer, best = blockPool.peers.getPeer("peer0")
if bestpeer != nil {
t.Errorf("peer0 not removed")
return
}
// adding back earlier peer ok
peer0.currentBlock = 5
peer0.td = 5
best = peer0.AddPeer()
if !best {
t.Errorf("peer0 (TD=5) should be set as best")
return
}
if blockPool.peers.best.id != "peer0" {
t.Errorf("peer0 (TD=5) should be set as best")
return
}
peer0.serveBlocks(4, 5)
hash := hashPool.IndexesToHashes([]int{6})[0]
newblock := &types.Block{Td: big.NewInt(int64(6)), HeaderHash: hash}
blockPool.chainEvents.Post(core.ChainHeadEvent{newblock})
time.Sleep(100 * time.Millisecond)
if blockPool.peers.best != nil {
t.Errorf("no peer should be ahead of self")
return
}
best = peer1.AddPeer()
if blockPool.peers.best != nil {
t.Errorf("after peer1 (TD=6) still no peer should be ahead of self")
return
}
best = peer2.AddPeer()
if !best {
t.Errorf("peer2 (TD=8) not accepted as best")
return
}
blockPool.RemovePeer("peer2")
if blockPool.peers.best != nil {
t.Errorf("no peer should be ahead of self")
return
}
blockPool.Stop()
}
func TestPeerPromotionByTdOnBlock(t *testing.T) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(4)
peer0 := blockPoolTester.newPeer("peer0", 2, 2)
peer1 := blockPoolTester.newPeer("peer1", 1, 1)
peer2 := blockPoolTester.newPeer("peer2", 4, 4)
blockPool.Start()
peer0.AddPeer()
peer0.serveBlocks(1, 2)
best := peer1.AddPeer()
// this tests that peer1 is not promoted over peer0 yet
if best {
t.Errorf("peer1 (TD=1) should not be set as best")
return
}
best = peer2.AddPeer()
peer2.serveBlocks(3, 4)
peer2.serveBlockHashes(4, 3, 2, 1)
peer1.sendBlocks(3, 4)
blockPool.RemovePeer("peer2")
if blockPool.peers.best.id != "peer1" {
t.Errorf("peer1 (TD=3) should be set as best")
return
}
peer1.serveBlocks(0, 1, 2, 3)
blockPool.Wait(waitTimeout)
blockPool.Stop()
blockPoolTester.refBlockChain[4] = []int{}
blockPoolTester.checkBlockChain(blockPoolTester.refBlockChain)
}

View File

@ -1,673 +0,0 @@
package blockpool
import (
"sync"
"time"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog"
)
/*
section is the worker on each chain section in the block pool
- remove the section if there are blocks missing after an absolute time
- remove the section if there are maxIdleRounds of idle rounds of block requests with no response
- periodically polls the chain section for missing blocks which are then requested from peers
- registers the process controller on the peer so that if the peer is promoted as best peer the second time (after a disconnect of a better one), all active processes are switched back on unless they removed (inserted in blockchain, invalid or expired)
- when turned off (if peer disconnects and new peer connects with alternative chain), no blockrequests are made but absolute expiry timer is ticking
- when turned back on it recursively calls itself on the root of the next chain section
*/
type section struct {
lock sync.RWMutex
parent *section // connecting section back in time towards blockchain
child *section // connecting section forward in time
top *node // the topmost node = head node = youngest node within the chain section
bottom *node // the bottom node = root node = oldest node within the chain section
nodes []*node
peer *peer
parentHash common.Hash
blockHashes []common.Hash
poolRootIndex int
bp *BlockPool
controlC chan *peer // to (de)register the current best peer
poolRootC chan *peer // indicate connectedness to blockchain (well, known blocks)
offC chan bool // closed if process terminated
suicideC chan bool // initiate suicide on the section
quitInitC chan bool // to signal end of initialisation
forkC chan chan bool // freeze section process while splitting
switchC chan bool // switching
idleC chan bool // channel to indicate thai food
processC chan *node //
missingC chan *node //
blocksRequestTimer <-chan time.Time
blockHashesRequestTimer <-chan time.Time
suicideTimer <-chan time.Time
blocksRequests int
blockHashesRequests int
blocksRequestsComplete bool
blockHashesRequestsComplete bool
ready bool
same bool
initialised bool
active bool
step int
idle int
missing int
lastMissing int
depth int
invalid bool
poolRoot bool
}
//
func (self *BlockPool) newSection(nodes []*node) *section {
sec := &section{
bottom: nodes[len(nodes)-1],
top: nodes[0],
nodes: nodes,
poolRootIndex: len(nodes),
bp: self,
controlC: make(chan *peer),
poolRootC: make(chan *peer),
offC: make(chan bool),
}
for i, n := range nodes {
entry := &entry{node: n, section: sec, index: &index{i}}
self.set(n.hash, entry)
}
glog.V(logger.Detail).Infof("[%s] setup section process", sectionhex(sec))
go sec.run()
return sec
}
func (self *section) addSectionToBlockChain(p *peer) {
self.bp.wg.Add(1)
go func() {
self.lock.Lock()
defer self.lock.Unlock()
defer func() {
self.bp.wg.Done()
}()
var nodes []*node
var n *node
var keys []common.Hash
var blocks []*types.Block
for self.poolRootIndex > 0 {
n = self.nodes[self.poolRootIndex-1]
n.lock.RLock()
block := n.block
n.lock.RUnlock()
if block == nil {
break
}
self.poolRootIndex--
keys = append(keys, n.hash)
blocks = append(blocks, block)
nodes = append(nodes, n)
}
if len(blocks) == 0 {
return
}
self.bp.lock.Lock()
for _, key := range keys {
delete(self.bp.pool, key)
}
self.bp.lock.Unlock()
glog.V(logger.Debug).Infof("[%s] insert %v blocks [%v/%v] into blockchain", sectionhex(self), len(blocks), hex(blocks[0].Hash()), hex(blocks[len(blocks)-1].Hash()))
err := self.bp.insertChain(blocks)
if err != nil {
self.invalid = true
self.bp.peers.peerError(n.blockBy, ErrInvalidBlock, "%v", err)
glog.V(logger.Error).Infof("invalid block %x", n.hash)
glog.V(logger.Error).Infof("penalise peers %v (hash), %v (block)", n.hashBy, n.blockBy)
// or invalid block and the entire chain needs to be removed
self.removeChain()
} else {
// check tds
self.bp.wg.Add(1)
go func() {
self.bp.checkTD(nodes...)
self.bp.wg.Done()
}()
// if all blocks inserted in this section
// then need to try to insert blocks in child section
if self.poolRootIndex == 0 {
// if there is a child section, then recursively call itself:
// also if section process is not terminated,
// then signal blockchain connectivity with poolRootC
if child := self.bp.getChild(self); child != nil {
select {
case <-child.offC:
glog.V(logger.Detail).Infof("[%s] add complete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
case child.poolRootC <- p:
glog.V(logger.Detail).Infof("[%s] add incomplete child section [%s] to the blockchain", sectionhex(self), sectionhex(child))
}
child.addSectionToBlockChain(p)
} else {
glog.V(logger.Detail).Infof("[%s] no child section in pool", sectionhex(self))
}
glog.V(logger.Detail).Infof("[%s] section completely inserted to blockchain - remove", sectionhex(self))
// complete sections are removed. if called from within section process,
// this must run in its own go routine to avoid deadlock
self.remove()
}
}
self.bp.status.lock.Lock()
if err == nil {
headKey := blocks[0].ParentHash()
height := self.bp.status.chain[headKey] + len(blocks)
self.bp.status.chain[blocks[len(blocks)-1].Hash()] = height
if height > self.bp.status.values.LongestChain {
self.bp.status.values.LongestChain = height
}
delete(self.bp.status.chain, headKey)
}
self.bp.status.values.BlocksInChain += len(blocks)
self.bp.status.values.BlocksInPool -= len(blocks)
if err != nil {
self.bp.status.badPeers[n.blockBy]++
}
self.bp.status.lock.Unlock()
}()
}
func (self *section) run() {
// absolute time after which sub-chain is killed if not complete (some blocks are missing)
self.suicideC = make(chan bool)
self.forkC = make(chan chan bool)
self.suicideTimer = time.After(self.bp.Config.BlocksTimeout)
// node channels for the section
// container for missing block hashes
var checking bool
var ping = time.NewTicker(5 * time.Second)
LOOP:
for !self.blockHashesRequestsComplete || !self.blocksRequestsComplete {
select {
case <-ping.C:
var name = "no peer"
if self.peer != nil {
name = self.peer.id
}
glog.V(logger.Detail).Infof("[%s] peer <%s> active: %v", sectionhex(self), name, self.active)
// global quit from blockpool
case <-self.bp.quit:
break LOOP
// pause for peer switching
case <-self.switchC:
self.switchC = nil
case p := <-self.poolRootC:
// signal on pool root channel indicates that the blockpool is
// connected to the blockchain, insert the longest chain of blocks
// ignored in idle mode to avoid inserting chain sections of non-live peers
self.poolRoot = true
// switch off hash requests in case they were on
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
self.switchOn(p)
// peer quit or demoted, put section in idle mode
case <-self.idleC:
// peer quit or demoted, put section in idle mode
glog.V(logger.Debug).Infof("[%s] peer <%s> quit or demoted", sectionhex(self), self.peer.id)
self.switchOff()
self.idleC = nil
// timebomb - if section is not complete in time, nuke the entire chain
case <-self.suicideTimer:
self.removeChain()
glog.V(logger.Debug).Infof("[%s] timeout. (%v total attempts): missing %v/%v/%v...suicide", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.suicideTimer = nil
break LOOP
// closing suicideC triggers section suicide: removes section nodes from pool and terminates section process
case <-self.suicideC:
glog.V(logger.Detail).Infof("[%s] quit", sectionhex(self))
break LOOP
// alarm for checking blocks in the section
case <-self.blocksRequestTimer:
glog.V(logger.Detail).Infof("[%s] alarm: block request time", sectionhex(self))
self.processC = self.missingC
// alarm for checking parent of the section or sending out hash requests
case <-self.blockHashesRequestTimer:
glog.V(logger.Detail).Infof("[%s] alarm: hash request time", sectionhex(self))
self.blockHashesRequest()
// activate this section process with a peer
case p := <-self.controlC:
if p == nil {
self.switchOff()
} else {
self.switchOn(p)
}
self.bp.wg.Done()
// blocks the process until section is split at the fork
case waiter := <-self.forkC:
<-waiter
self.initialised = false
self.quitInitC = nil
//
case n, ok := <-self.processC:
// channel closed, first iteration finished
if !ok && !self.initialised {
glog.V(logger.Detail).Infof("[%s] section initalised: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
self.initialised = true
self.processC = nil
self.checkRound()
checking = false
break
}
if !checking {
self.step = 0
self.missing = 0
checking = true
}
self.step++
n.lock.RLock()
block := n.block
n.lock.RUnlock()
// if node has no block, request it (buffer it for batch request)
// feed it to missingC channel for the next round
if block == nil {
pos := self.missing % self.bp.Config.BlockBatchSize
if pos == 0 {
if self.missing != 0 {
self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:])
}
self.blockHashes = self.bp.getHashSlice()
}
self.blockHashes[pos] = n.hash
self.missing++
self.missingC <- n
} else {
// checking for parent block
if self.poolRoot {
// if node has got block (received via async AddBlock call from protocol)
if self.step == self.lastMissing {
// current root of the pool
glog.V(logger.Detail).Infof("[%s] received block for current pool root %s", sectionhex(self), hex(n.hash))
self.addSectionToBlockChain(self.peer)
}
} else {
if (self.parentHash == common.Hash{}) && n == self.bottom {
self.parentHash = block.ParentHash()
glog.V(logger.Detail).Infof("[%s] got parent head block hash %s...checking", sectionhex(self), hex(self.parentHash))
self.blockHashesRequest()
}
}
}
if self.initialised && self.step == self.lastMissing {
glog.V(logger.Detail).Infof("[%s] check if new blocks arrived (attempt %v): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.checkRound()
checking = false
}
} // select
} // for
close(self.offC)
if self.peer != nil {
self.active = false
self.bp.wg.Done()
}
glog.V(logger.Detail).Infof("[%s] section process terminated: %v blocks retrieved (%v attempts), hash requests complete on root (%v attempts).", sectionhex(self), self.depth, self.blocksRequests, self.blockHashesRequests)
}
func (self *section) switchOn(newpeer *peer) {
oldpeer := self.peer
// reset switchC/switchC to current best peer
self.idleC = newpeer.idleC
self.switchC = newpeer.switchC
self.peer = newpeer
if oldpeer != newpeer {
oldp := "no peer"
newp := "no peer"
if oldpeer != nil {
oldp = oldpeer.id
}
if newpeer != nil {
newp = newpeer.id
}
glog.V(logger.Detail).Infof("[%s] active mode <%s> -> <%s>", sectionhex(self), oldp, newp)
}
// activate section with current peer
if oldpeer == nil {
self.bp.wg.Add(1)
self.active = true
if !self.blockHashesRequestsComplete {
self.blockHashesRequestTimer = time.After(0)
}
if !self.blocksRequestsComplete {
if !self.initialised {
if self.quitInitC != nil {
<-self.quitInitC
}
self.missingC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
self.processC = make(chan *node, self.bp.Config.BlockHashesBatchSize)
self.quitInitC = make(chan bool)
self.step = 0
self.missing = 0
self.depth = len(self.nodes)
self.lastMissing = self.depth
self.feedNodes()
} else {
self.blocksRequestTimer = time.After(0)
}
}
}
}
// put the section to idle mode
func (self *section) switchOff() {
// active -> idle
if self.peer != nil {
oldp := "no peer"
oldpeer := self.peer
if oldpeer != nil {
oldp = oldpeer.id
}
glog.V(logger.Detail).Infof("[%s] idle mode peer <%s> -> <> (%v total attempts): missing %v/%v/%v", sectionhex(self), oldp, self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.active = false
self.peer = nil
// turn off timers
self.blocksRequestTimer = nil
self.blockHashesRequestTimer = nil
if self.quitInitC != nil {
<-self.quitInitC
self.quitInitC = nil
}
self.processC = nil
self.bp.wg.Done()
}
}
// iterates through nodes of a section to feed processC
// used to initialise chain section
func (self *section) feedNodes() {
// if not run at least once fully, launch iterator
self.bp.wg.Add(1)
go func() {
self.lock.Lock()
defer self.lock.Unlock()
defer func() {
self.bp.wg.Done()
}()
var n *node
INIT:
for _, n = range self.nodes {
select {
case self.processC <- n:
case <-self.bp.quit:
break INIT
}
}
close(self.processC)
close(self.quitInitC)
}()
}
func (self *section) blockHashesRequest() {
if self.switchC != nil {
self.bp.chainLock.Lock()
parentSection := self.parent
if parentSection == nil {
// only link to new parent if not switching peers
if (self.parentHash != common.Hash{}) {
if parent := self.bp.get(self.parentHash); parent != nil {
parentSection = parent.section
glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parent section [%s] linked\n", sectionhex(self), sectionhex(parentSection))
link(parentSection, self)
} else {
if self.bp.hasBlock(self.parentHash) {
self.poolRoot = true
glog.V(logger.Detail).Infof("[%s] blockHashesRequest: parentHash known ... inserting section in blockchain", sectionhex(self))
self.addSectionToBlockChain(self.peer)
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
}
}
}
}
self.bp.chainLock.Unlock()
if !self.poolRoot {
if parentSection != nil {
// activate parent section with this peer
// but only if not during switch mode
glog.V(logger.Detail).Infof("[%s] parent section [%s] activated\n", sectionhex(self), sectionhex(parentSection))
self.bp.activateChain(parentSection, self.peer, self.peer.switchC, nil)
// if not root of chain, switch off
glog.V(logger.Detail).Infof("[%s] parent found, hash requests deactivated (after %v total attempts)\n", sectionhex(self), self.blockHashesRequests)
self.blockHashesRequestTimer = nil
self.blockHashesRequestsComplete = true
} else {
self.blockHashesRequests++
glog.V(logger.Detail).Infof("[%s] hash request on root (%v total attempts)\n", sectionhex(self), self.blockHashesRequests)
self.peer.requestBlockHashes(self.bottom.hash)
self.blockHashesRequestTimer = time.After(self.bp.Config.BlockHashesRequestInterval)
}
}
}
}
// checks number of missing blocks after each round of request and acts accordingly
func (self *section) checkRound() {
if self.missing == 0 {
// no missing blocks
glog.V(logger.Detail).Infof("[%s] section checked: got all blocks. process complete (%v total blocksRequests): missing %v/%v/%v", sectionhex(self), self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.blocksRequestsComplete = true
self.blocksRequestTimer = nil
} else {
// some missing blocks
glog.V(logger.Detail).Infof("[%s] section checked: missing %v/%v/%v", sectionhex(self), self.missing, self.lastMissing, self.depth)
self.blocksRequests++
pos := self.missing % self.bp.Config.BlockBatchSize
if pos == 0 {
pos = self.bp.Config.BlockBatchSize
}
self.bp.requestBlocks(self.blocksRequests, self.blockHashes[:pos])
// handle idle rounds
if self.missing == self.lastMissing {
// idle round
if self.same {
// more than once
self.idle++
// too many idle rounds
if self.idle >= self.bp.Config.BlocksRequestMaxIdleRounds {
glog.V(logger.Detail).Infof("[%s] block requests had %v idle rounds (%v total attempts): missing %v/%v/%v\ngiving up...", sectionhex(self), self.idle, self.blocksRequests, self.missing, self.lastMissing, self.depth)
self.removeChain()
}
} else {
self.idle = 0
}
self.same = true
} else {
self.same = false
}
self.lastMissing = self.missing
// put processC offline
self.processC = nil
self.blocksRequestTimer = time.After(self.bp.Config.BlocksRequestInterval)
}
}
/*
link connects two sections via parent/child fields
creating a doubly linked list
caller must hold BlockPool chainLock
*/
func link(parent *section, child *section) {
if parent != nil {
exChild := parent.child
parent.child = child
if exChild != nil && exChild != child {
if child != nil {
// if child is nil it is not a real fork
glog.V(logger.Detail).Infof("[%s] chain fork [%s] -> [%s]", sectionhex(parent), sectionhex(exChild), sectionhex(child))
}
exChild.parent = nil
}
}
if child != nil {
exParent := child.parent
if exParent != nil && exParent != parent {
if parent != nil {
// if parent is nil it is not a real fork, but suicide delinking section
glog.V(logger.Detail).Infof("[%s] chain reverse fork [%s] -> [%s]", sectionhex(child), sectionhex(exParent), sectionhex(parent))
}
exParent.child = nil
}
child.parent = parent
}
}
/*
handle forks where connecting node is mid-section
by splitting section at fork
no splitting needed if connecting node is head of a section
caller must hold chain lock
*/
func (self *BlockPool) splitSection(parent *section, entry *entry) {
glog.V(logger.Detail).Infof("[%s] split section at fork", sectionhex(parent))
parent.deactivate()
waiter := make(chan bool)
parent.wait(waiter)
chain := parent.nodes
parent.nodes = chain[entry.index.int:]
parent.top = parent.nodes[0]
parent.poolRootIndex -= entry.index.int
orphan := self.newSection(chain[0:entry.index.int])
link(orphan, parent.child)
close(waiter)
orphan.deactivate()
}
func (self *section) wait(waiter chan bool) {
self.forkC <- waiter
}
func (self *BlockPool) linkSections(nodes []*node, parent, child *section) (sec *section) {
// if new section is created, link it to parent/child sections
// and launch section process fetching block and further hashes
if len(nodes) > 0 {
sec = self.newSection(nodes)
glog.V(logger.Debug).Infof("[%s]->[%s](%v)->[%s] new chain section", sectionhex(parent), sectionhex(sec), len(nodes), sectionhex(child))
link(parent, sec)
link(sec, child)
} else {
if parent != nil && child != nil {
// now this can only happen if we allow response to hash request to include <from> hash
// in this case we just link parent and child (without needing root block of child section)
glog.V(logger.Debug).Infof("[%s]->[%s] connecting known sections", sectionhex(parent), sectionhex(child))
link(parent, child)
}
}
return
}
func (self *section) activate(p *peer) {
self.bp.wg.Add(1)
select {
case <-self.offC:
glog.V(logger.Detail).Infof("[%s] completed section process. cannot activate for peer <%s>", sectionhex(self), p.id)
self.bp.wg.Done()
case self.controlC <- p:
glog.V(logger.Detail).Infof("[%s] activate section process for peer <%s>", sectionhex(self), p.id)
}
}
func (self *section) deactivate() {
self.bp.wg.Add(1)
self.controlC <- nil
}
// removes this section exacly
func (self *section) remove() {
select {
case <-self.offC:
close(self.suicideC)
glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self))
case <-self.suicideC:
glog.V(logger.Detail).Infof("[%s] remove: suicided already", sectionhex(self))
default:
glog.V(logger.Detail).Infof("[%s] remove: suicide", sectionhex(self))
close(self.suicideC)
}
self.unlink()
self.bp.remove(self)
glog.V(logger.Detail).Infof("[%s] removed section.", sectionhex(self))
}
// remove a section and all its descendents from the pool
func (self *section) removeChain() {
// need to get the child before removeSection delinks the section
self.bp.chainLock.RLock()
child := self.child
self.bp.chainLock.RUnlock()
glog.V(logger.Detail).Infof("[%s] remove chain", sectionhex(self))
self.remove()
if child != nil {
child.removeChain()
}
}
// unlink a section from its parent/child
func (self *section) unlink() {
// first delink from child and parent under chainlock
self.bp.chainLock.Lock()
link(nil, self)
link(self, nil)
self.bp.chainLock.Unlock()
}

View File

@ -1,111 +0,0 @@
package blockpool
import (
"fmt"
"sync"
"github.com/ethereum/go-ethereum/common"
)
type statusValues struct {
BlockHashes int // number of hashes fetched this session
BlockHashesInPool int // number of hashes currently in the pool
Blocks int // number of blocks fetched this session
BlocksInPool int // number of blocks currently in the pool
BlocksInChain int // number of blocks inserted/connected to the blockchain this session
NewBlocks int // number of new blocks (received with new blocks msg) this session
Forks int // number of chain forks in the blockchain (poolchain) this session
LongestChain int // the longest chain inserted since the start of session (aka session blockchain height)
BestPeer []byte //Pubkey
Syncing bool // requesting, updating etc
Peers int // cumulative number of all different registered peers since the start of this session
ActivePeers int // cumulative number of all different peers that contributed a hash or block since the start of this session
LivePeers int // number of live peers registered with the block pool (supposed to be redundant but good sanity check
BestPeers int // cumulative number of all peers that at some point were promoted as best peer (peer with highest TD status) this session
BadPeers int // cumulative number of all peers that violated the protocol (invalid block or pow, unrequested hash or block, etc)
}
type status struct {
lock sync.Mutex
values statusValues
chain map[common.Hash]int
peers map[string]int
bestPeers map[string]int
badPeers map[string]int
activePeers map[string]int
}
func newStatus() *status {
return &status{
chain: make(map[common.Hash]int),
peers: make(map[string]int),
bestPeers: make(map[string]int),
badPeers: make(map[string]int),
activePeers: make(map[string]int),
}
}
type Status struct {
statusValues
}
// blockpool status for reporting
func (self *BlockPool) Status() *Status {
self.status.lock.Lock()
defer self.status.lock.Unlock()
self.status.values.ActivePeers = len(self.status.activePeers)
self.status.values.BestPeers = len(self.status.bestPeers)
self.status.values.BadPeers = len(self.status.badPeers)
self.status.values.LivePeers = len(self.peers.peers)
self.status.values.Peers = len(self.status.peers)
self.status.values.BlockHashesInPool = len(self.pool)
return &Status{self.status.values}
}
func (self *Status) String() string {
return fmt.Sprintf(`
Syncing: %v
BlockHashes: %v
BlockHashesInPool: %v
Blocks: %v
BlocksInPool: %v
BlocksInChain: %v
NewBlocks: %v
Forks: %v
LongestChain: %v
Peers: %v
LivePeers: %v
ActivePeers: %v
BestPeers: %v
BadPeers: %v
`,
self.Syncing,
self.BlockHashes,
self.BlockHashesInPool,
self.Blocks,
self.BlocksInPool,
self.BlocksInChain,
self.NewBlocks,
self.Forks,
self.LongestChain,
self.Peers,
self.LivePeers,
self.ActivePeers,
self.BestPeers,
self.BadPeers,
)
}
func (self *BlockPool) syncing() {
self.status.lock.Lock()
defer self.status.lock.Unlock()
if !self.status.values.Syncing {
self.status.values.Syncing = true
go func() {
self.wg.Wait()
self.status.lock.Lock()
self.status.values.Syncing = false
self.status.lock.Unlock()
}()
}
}

View File

@ -1,244 +0,0 @@
package blockpool
import (
"fmt"
"testing"
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
)
var statusFields = []string{
"BlockHashes",
"BlockHashesInPool",
"Blocks",
"BlocksInPool",
"BlocksInChain",
"NewBlocks",
"Forks",
"LongestChain",
"Peers",
"LivePeers",
"ActivePeers",
"BestPeers",
"BadPeers",
}
func getStatusValues(s *Status) []int {
return []int{
s.BlockHashes,
s.BlockHashesInPool,
s.Blocks,
s.BlocksInPool,
s.BlocksInChain,
s.NewBlocks,
s.Forks,
s.LongestChain,
s.Peers,
s.LivePeers,
s.ActivePeers,
s.BestPeers,
s.BadPeers,
}
}
func checkStatus(t *testing.T, bp *BlockPool, syncing bool, expected []int) (err error) {
s := bp.Status()
if s.Syncing != syncing {
err = fmt.Errorf("status for Syncing incorrect. expected %v, got %v", syncing, s.Syncing)
return
}
got := getStatusValues(s)
for i, v := range expected {
err = test.CheckInt(statusFields[i], got[i], v, t)
if err != nil {
return
}
}
return
}
func TestBlockPoolStatus(t *testing.T) {
var err error
n := 3
for n > 0 {
n--
err = testBlockPoolStatus(t)
if err != nil {
t.Log(err)
continue
} else {
return
}
}
if err != nil {
t.Errorf("no pass out of 3: %v", err)
}
}
func testBlockPoolStatus(t *testing.T) (err error) {
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPoolTester.blockChain[0] = nil
blockPoolTester.initRefBlockChain(12)
blockPoolTester.refBlockChain[3] = []int{4, 7}
blockPoolTester.refBlockChain[5] = []int{10}
blockPoolTester.refBlockChain[6] = []int{11}
blockPoolTester.refBlockChain[9] = []int{6}
delete(blockPoolTester.refBlockChain, 10)
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 9, 9)
peer2 := blockPoolTester.newPeer("peer2", 10, 10)
peer3 := blockPoolTester.newPeer("peer3", 11, 11)
peer4 := blockPoolTester.newPeer("peer4", 9, 9)
peer2.blocksRequestsMap = peer1.blocksRequestsMap
var expected []int
expected = []int{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0}
err = checkStatus(nil, blockPool, false, expected)
if err != nil {
return
}
peer1.AddPeer()
expected = []int{0, 0, 0, 0, 0, 1, 0, 0, 1, 1, 0, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlocks(8, 9)
expected = []int{1, 0, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlockHashes(9, 8, 7, 3, 2)
expected = []int{5, 5, 1, 1, 0, 1, 0, 0, 1, 1, 1, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlocks(3, 7, 8)
expected = []int{5, 5, 3, 3, 0, 1, 0, 0, 1, 1, 1, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlocks(2, 3)
expected = []int{5, 5, 4, 4, 0, 1, 0, 0, 1, 1, 1, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer4.AddPeer()
expected = []int{5, 5, 4, 4, 0, 2, 0, 0, 2, 2, 1, 1, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer2.AddPeer()
expected = []int{5, 5, 4, 4, 0, 3, 0, 0, 3, 3, 1, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer2.serveBlocks(5, 10)
peer2.serveBlockHashes(10, 5, 4, 3, 2)
expected = []int{8, 8, 5, 5, 0, 3, 1, 0, 3, 3, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer2.serveBlocks(2, 3, 4)
expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 3, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
blockPool.RemovePeer("peer2")
expected = []int{8, 8, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlockHashes(2, 1, 0)
expected = []int{9, 9, 6, 6, 0, 3, 1, 0, 3, 2, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlocks(1, 2)
expected = []int{9, 9, 7, 7, 0, 3, 1, 0, 3, 2, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer1.serveBlocks(4, 5)
expected = []int{9, 9, 8, 8, 0, 3, 1, 0, 3, 2, 2, 2, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer3.AddPeer()
expected = []int{9, 9, 8, 8, 0, 4, 1, 0, 4, 3, 2, 3, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer3.serveBlocks(6, 11)
expected = []int{10, 9, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer3.serveBlockHashes(11, 6, 9)
expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 3, 3, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer4.sendBlocks(11, 12)
expected = []int{11, 11, 9, 9, 0, 4, 1, 0, 4, 3, 4, 3, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer3.serveBlocks(9, 6)
expected = []int{11, 11, 10, 10, 0, 4, 1, 0, 4, 3, 4, 3, 0}
err = checkStatus(nil, blockPool, true, expected)
if err != nil {
return
}
peer3.serveBlocks(0, 1)
blockPool.Wait(waitTimeout)
time.Sleep(200 * time.Millisecond)
expected = []int{11, 3, 11, 3, 8, 4, 1, 8, 4, 3, 4, 3, 0}
err = checkStatus(nil, blockPool, false, expected)
blockPool.Stop()
if err != nil {
return
}
return nil
}

View File

@ -1,55 +0,0 @@
package test
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
// hashPool is a test helper, that allows random hashes to be referred to by integers
type TestHashPool struct {
intToHash
hashToInt
lock sync.Mutex
}
func NewHashPool() *TestHashPool {
return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)}
}
type intToHash map[int]common.Hash
type hashToInt map[common.Hash]int
func newHash(i int) common.Hash {
return common.BytesToHash(crypto.Sha3([]byte(string(i))))
}
func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
for _, i := range indexes {
hash, found := self.intToHash[i]
if !found {
hash = newHash(i)
self.intToHash[i] = hash
self.hashToInt[hash] = i
}
hashes = append(hashes, hash)
}
return
}
func (self *TestHashPool) HashesToIndexes(hashes []common.Hash) (indexes []int) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range hashes {
i, found := self.hashToInt[hash]
if !found {
i = -1
}
indexes = append(indexes, i)
}
return
}

View File

@ -1,74 +0,0 @@
package test
import (
"log"
"os"
"sync"
"testing"
"github.com/ethereum/go-ethereum/logger"
)
// logging in tests
var once sync.Once
/* usage:
func TestFunc(t *testing.T) {
test.LogInit()
// test
}
*/
func LogInit() {
once.Do(func() {
logger.NewStdLogSystem(os.Stdout, log.LstdFlags, logger.LogLevel(logger.DebugDetailLevel))
})
}
type testLogger struct{ t *testing.T }
/* usage:
func TestFunc(t *testing.T) {
defer test.Testlog.Detach()
// test
}
*/
func Testlog(t *testing.T) testLogger {
logger.Reset()
l := testLogger{t}
logger.AddLogSystem(l)
return l
}
func (l testLogger) LogPrint(msg logger.LogMsg) {
l.t.Log(msg.String())
}
func (testLogger) Detach() {
logger.Flush()
logger.Reset()
}
type benchLogger struct{ b *testing.B }
/* usage:
func BenchmarkFunc(b *testing.B) {
defer test.Benchlog.Detach()
// test
}
*/
func Benchlog(b *testing.B) benchLogger {
logger.Reset()
l := benchLogger{b}
logger.AddLogSystem(l)
return l
}
func (l benchLogger) LogPrint(msg logger.LogMsg) {
l.b.Log(msg.String())
}
func (benchLogger) Detach() {
logger.Flush()
logger.Reset()
}

View File

@ -1,41 +0,0 @@
package test
import (
"fmt"
"testing"
"time"
)
// miscellaneous test helpers
func CheckInt(name string, got int, expected int, t *testing.T) (err error) {
if got != expected {
err = fmt.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got)
if t != nil {
t.Error(err)
}
}
return
}
func CheckDuration(name string, got time.Duration, expected time.Duration, t *testing.T) (err error) {
if got != expected {
err = fmt.Errorf("status for %v incorrect. expected %v, got %v", name, expected, got)
if t != nil {
t.Error(err)
}
}
return
}
func ArrayEq(a, b []int) bool {
if len(a) != len(b) {
return false
}
for i := range a {
if a[i] != b[i] {
return false
}
}
return true
}

View File

@ -85,8 +85,8 @@ func (self *BlockProcessor) ApplyTransaction(coinbase *state.StateObject, stated
_, gas, err := ApplyMessage(NewEnv(statedb, self.bc, tx, block), tx, cb) _, gas, err := ApplyMessage(NewEnv(statedb, self.bc, tx, block), tx, cb)
if err != nil && (IsNonceErr(err) || state.IsGasLimitErr(err) || IsInvalidTxErr(err)) { if err != nil && (IsNonceErr(err) || state.IsGasLimitErr(err) || IsInvalidTxErr(err)) {
// If the account is managed, remove the invalid nonce. // If the account is managed, remove the invalid nonce.
from, _ := tx.From() //from, _ := tx.From()
self.bc.TxState().RemoveNonce(from, tx.Nonce()) //self.bc.TxState().RemoveNonce(from, tx.Nonce())
return nil, nil, err return nil, nil, err
} }

View File

@ -26,11 +26,10 @@ var (
blockNumPre = []byte("block-num-") blockNumPre = []byte("block-num-")
) )
const blockCacheLimit = 10000 const (
blockCacheLimit = 10000
type StateQuery interface { maxFutureBlocks = 256
GetAccount(addr []byte) *state.StateObject )
}
func CalcDifficulty(block, parent *types.Header) *big.Int { func CalcDifficulty(block, parent *types.Header) *big.Int {
diff := new(big.Int) diff := new(big.Int)
@ -95,7 +94,14 @@ type ChainManager struct {
} }
func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager { func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *ChainManager {
bc := &ChainManager{blockDb: blockDb, stateDb: stateDb, genesisBlock: GenesisBlock(stateDb), eventMux: mux, quit: make(chan struct{}), cache: NewBlockCache(blockCacheLimit)} bc := &ChainManager{
blockDb: blockDb,
stateDb: stateDb,
genesisBlock: GenesisBlock(stateDb),
eventMux: mux,
quit: make(chan struct{}),
cache: NewBlockCache(blockCacheLimit),
}
bc.setLastBlock() bc.setLastBlock()
// Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain // Check the current state of the block hashes and make sure that we do not have any of the bad blocks in our chain
@ -116,7 +122,7 @@ func NewChainManager(blockDb, stateDb common.Database, mux *event.TypeMux) *Chai
// Take ownership of this particular state // Take ownership of this particular state
bc.txState = state.ManageState(bc.State().Copy()) bc.txState = state.ManageState(bc.State().Copy())
bc.futureBlocks = NewBlockCache(254) bc.futureBlocks = NewBlockCache(maxFutureBlocks)
bc.makeCache() bc.makeCache()
go bc.update() go bc.update()

View File

@ -134,7 +134,8 @@ Logs:
for i, topics := range self.topics { for i, topics := range self.topics {
for _, topic := range topics { for _, topic := range topics {
var match bool var match bool
if log.Topics[i] == topic { // common.Hash{} is a match all (wildcard)
if (topic == common.Hash{}) || log.Topics[i] == topic {
match = true match = true
} }
if !match { if !match {

View File

@ -62,6 +62,7 @@ func (ms *ManagedState) NewNonce(addr common.Address) uint64 {
} }
} }
account.nonces = append(account.nonces, true) account.nonces = append(account.nonces, true)
return uint64(len(account.nonces)-1) + account.nstart return uint64(len(account.nonces)-1) + account.nstart
} }

View File

@ -28,6 +28,8 @@ const txPoolQueueSize = 50
type TxPoolHook chan *types.Transaction type TxPoolHook chan *types.Transaction
type TxMsg struct{ Tx *types.Transaction } type TxMsg struct{ Tx *types.Transaction }
type stateFn func() *state.StateDB
const ( const (
minGasPrice = 1000000 minGasPrice = 1000000
) )
@ -47,7 +49,7 @@ type TxPool struct {
// Quiting channel // Quiting channel
quit chan bool quit chan bool
// The state function which will allow us to do some pre checkes // The state function which will allow us to do some pre checkes
currentState func() *state.StateDB currentState stateFn
// The actual pool // The actual pool
txs map[common.Hash]*types.Transaction txs map[common.Hash]*types.Transaction
invalidHashes *set.Set invalidHashes *set.Set
@ -57,7 +59,7 @@ type TxPool struct {
eventMux *event.TypeMux eventMux *event.TypeMux
} }
func NewTxPool(eventMux *event.TypeMux, currentStateFn func() *state.StateDB) *TxPool { func NewTxPool(eventMux *event.TypeMux, currentStateFn stateFn) *TxPool {
return &TxPool{ return &TxPool{
txs: make(map[common.Hash]*types.Transaction), txs: make(map[common.Hash]*types.Transaction),
queueChan: make(chan *types.Transaction, txPoolQueueSize), queueChan: make(chan *types.Transaction, txPoolQueueSize),

View File

@ -253,11 +253,23 @@ func (self *worker) commitNewWork() {
// Keep track of transactions which return errors so they can be removed // Keep track of transactions which return errors so they can be removed
var ( var (
remove = set.New() remove = set.New()
tcount = 0 tcount = 0
ignoredTransactors = set.New()
) )
//gasLimit: //gasLimit:
for _, tx := range transactions { for _, tx := range transactions {
// We can skip err. It has already been validated in the tx pool
from, _ := tx.From()
// Move on to the next transaction when the transactor is in ignored transactions set
// This may occur when a transaction hits the gas limit. When a gas limit is hit and
// the transaction is processed (that could potentially be included in the block) it
// will throw a nonce error because the previous transaction hasn't been processed.
// Therefor we need to ignore any transaction after the ignored one.
if ignoredTransactors.Has(from) {
continue
}
self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0) self.current.state.StartRecord(tx.Hash(), common.Hash{}, 0)
err := self.commitTransaction(tx) err := self.commitTransaction(tx)
@ -265,14 +277,18 @@ func (self *worker) commitNewWork() {
case core.IsNonceErr(err) || core.IsInvalidTxErr(err): case core.IsNonceErr(err) || core.IsInvalidTxErr(err):
// Remove invalid transactions // Remove invalid transactions
from, _ := tx.From() from, _ := tx.From()
self.chain.TxState().RemoveNonce(from, tx.Nonce()) self.chain.TxState().RemoveNonce(from, tx.Nonce())
remove.Add(tx.Hash()) remove.Add(tx.Hash())
if glog.V(logger.Detail) { if glog.V(logger.Detail) {
glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err) glog.Infof("TX (%x) failed, will be removed: %v\n", tx.Hash().Bytes()[:4], err)
//glog.Infoln(tx)
} }
case state.IsGasLimitErr(err): case state.IsGasLimitErr(err):
from, _ := tx.From()
// ignore the transactor so no nonce errors will be thrown for this account
// next time the worker is run, they'll be picked up again.
ignoredTransactors.Add(from)
//glog.V(logger.Debug).Infof("Gas limit reached for block. %d TXs included in this block\n", i) //glog.V(logger.Debug).Infof("Gas limit reached for block. %d TXs included in this block\n", i)
//break gasLimit //break gasLimit
default: default:

View File

@ -739,10 +739,14 @@ func (args *BlockFilterArgs) UnmarshalJSON(b []byte) (err error) {
for j, jv := range argarray { for j, jv := range argarray {
if v, ok := jv.(string); ok { if v, ok := jv.(string); ok {
topicdbl[i][j] = v topicdbl[i][j] = v
} else if jv == nil {
topicdbl[i][j] = ""
} else { } else {
return NewInvalidTypeError(fmt.Sprintf("topic[%d][%d]", i, j), "is not a string") return NewInvalidTypeError(fmt.Sprintf("topic[%d][%d]", i, j), "is not a string")
} }
} }
} else if iv == nil {
topicdbl[i] = []string{""}
} else { } else {
return NewInvalidTypeError(fmt.Sprintf("topic[%d]", i), "not a string or array") return NewInvalidTypeError(fmt.Sprintf("topic[%d]", i), "not a string or array")
} }