peer suspension to disallow reconnect after disconnect on fatal error for set period (PeerSuspensionInterval)

This commit is contained in:
zelig 2015-03-19 22:46:54 +00:00
parent 01ff0b3176
commit 50661f0e68
8 changed files with 125 additions and 44 deletions

View File

@ -33,7 +33,8 @@ var (
// timeout interval: max time allowed for peer without sending a block
blocksTimeout = 60 * time.Second
//
idleBestPeerTimeout = 120 * time.Second
idleBestPeerTimeout = 120 * time.Second
peerSuspensionInterval = 300 * time.Second
)
// config embedded in components, by default fall back to constants
@ -48,6 +49,7 @@ type Config struct {
BlockHashesTimeout time.Duration
BlocksTimeout time.Duration
IdleBestPeerTimeout time.Duration
PeerSuspensionInterval time.Duration
}
// blockpool errors
@ -96,6 +98,9 @@ func (self *Config) init() {
if self.IdleBestPeerTimeout == 0 {
self.IdleBestPeerTimeout = idleBestPeerTimeout
}
if self.PeerSuspensionInterval == 0 {
self.PeerSuspensionInterval = peerSuspensionInterval
}
}
// node is the basic unit of the internal model of block chain/tree in the blockpool
@ -188,9 +193,10 @@ func (self *BlockPool) Start() {
Errors: errorToString,
Level: severity,
},
peers: make(map[string]*peer),
status: self.status,
bp: self,
peers: make(map[string]*peer),
blacklist: make(map[string]time.Time),
status: self.status,
bp: self,
}
timer := time.NewTicker(3 * time.Second)
go func() {
@ -267,7 +273,8 @@ func (self *BlockPool) AddPeer(
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (best bool) {
) (best bool, suspended bool) {
return self.peers.addPeer(td, currentBlockHash, peerId, requestBlockHashes, requestBlocks, peerError)
}

View File

@ -5,8 +5,8 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
)
func TestPeerWithKnownBlock(t *testing.T) {
@ -69,8 +69,8 @@ func TestPeerPromotionByOptionalTdOnBlock(t *testing.T) {
hashes := blockPoolTester.hashPool.IndexesToHashes([]int{2, 3})
peer1.waitBlocksRequests(3)
blockPool.AddBlock(&types.Block{
HeaderHash: common.Bytes(hashes[1]),
ParentHeaderHash: common.Bytes(hashes[0]),
HeaderHash: common.Hash(hashes[1]),
ParentHeaderHash: common.Hash(hashes[0]),
Td: common.Big3,
}, "peer1")

View File

@ -8,9 +8,9 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/errs"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/pow"
)
@ -63,10 +63,10 @@ func (self *blockPoolTester) Errorf(format string, params ...interface{}) {
// blockPoolTester implements the 3 callbacks needed by the blockPool:
// hasBlock, insetChain, verifyPoW
func (self *blockPoolTester) hasBlock(block []byte) (ok bool) {
func (self *blockPoolTester) hasBlock(block common.Hash) (ok bool) {
self.lock.RLock()
defer self.lock.RUnlock()
indexes := self.hashPool.HashesToIndexes([][]byte{block})
indexes := self.hashPool.HashesToIndexes([]common.Hash{block})
i := indexes[0]
_, ok = self.blockChain[i]
fmt.Printf("has block %v (%x...): %v\n", i, block[0:4], ok)
@ -80,13 +80,13 @@ func (self *blockPoolTester) insertChain(blocks types.Blocks) error {
var children, refChildren []int
var ok bool
for _, block := range blocks {
child = self.hashPool.HashesToIndexes([][]byte{block.Hash()})[0]
child = self.hashPool.HashesToIndexes([]common.Hash{block.Hash()})[0]
_, ok = self.blockChain[child]
if ok {
fmt.Printf("block %v already in blockchain\n", child)
continue // already in chain
}
parent = self.hashPool.HashesToIndexes([][]byte{block.ParentHeaderHash})[0]
parent = self.hashPool.HashesToIndexes([]common.Hash{block.ParentHeaderHash})[0]
children, ok = self.blockChain[parent]
if !ok {
return fmt.Errorf("parent %v not in blockchain ", parent)
@ -274,9 +274,10 @@ func (self *blockPoolTester) initRefBlockChain(n int) {
// peerTester functions that mimic protocol calls to the blockpool
// registers the peer with the blockPool
func (self *peerTester) AddPeer() bool {
func (self *peerTester) AddPeer() (best bool) {
hash := self.hashPool.IndexesToHashes([]int{self.currentBlock})[0]
return self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
best, _ = self.blockPool.AddPeer(big.NewInt(int64(self.td)), hash, self.id, self.requestBlockHashes, self.requestBlocks, self.peerError)
return
}
// peer sends blockhashes if and when gets a request
@ -291,7 +292,7 @@ func (self *peerTester) sendBlockHashes(indexes ...int) {
fmt.Printf("adding block hashes %v\n", indexes)
hashes := self.hashPool.IndexesToHashes(indexes)
i := 1
next := func() (hash []byte, ok bool) {
next := func() (hash common.Hash, ok bool) {
if i < len(hashes) {
hash = hashes[i]
ok = true
@ -315,15 +316,15 @@ func (self *peerTester) sendBlocks(indexes ...int) {
hashes := self.hashPool.IndexesToHashes(indexes)
for i := 1; i < len(hashes); i++ {
fmt.Printf("adding block %v %x\n", indexes[i], hashes[i][:4])
self.blockPool.AddBlock(&types.Block{HeaderHash: common.Bytes(hashes[i]), ParentHeaderHash: common.Bytes(hashes[i-1])}, self.id)
self.blockPool.AddBlock(&types.Block{HeaderHash: hashes[i], ParentHeaderHash: hashes[i-1]}, self.id)
}
}
// peer callbacks
// -1 is special: not found (a hash never seen)
// records block hashes requests by the blockPool
func (self *peerTester) requestBlockHashes(hash []byte) error {
indexes := self.hashPool.HashesToIndexes([][]byte{hash})
func (self *peerTester) requestBlockHashes(hash common.Hash) error {
indexes := self.hashPool.HashesToIndexes([]common.Hash{hash})
fmt.Printf("[%s] block hash request %v %x\n", self.id, indexes[0], hash[:4])
self.lock.Lock()
defer self.lock.Unlock()
@ -332,7 +333,7 @@ func (self *peerTester) requestBlockHashes(hash []byte) error {
}
// records block requests by the blockPool
func (self *peerTester) requestBlocks(hashes [][]byte) error {
func (self *peerTester) requestBlocks(hashes []common.Hash) error {
indexes := self.hashPool.HashesToIndexes(hashes)
fmt.Printf("blocks request %v %x...\n", indexes, hashes[0][:4])
self.bt.reqlock.Lock()
@ -347,4 +348,9 @@ func (self *peerTester) requestBlocks(hashes [][]byte) error {
// records the error codes of all the peerErrors found the blockPool
func (self *peerTester) peerError(err *errs.Error) {
self.peerErrors = append(self.peerErrors, err.Code)
fmt.Printf("Error %v on peer %v\n", err, self.id)
if err.Fatal() {
fmt.Printf("Error %v is fatal, removing peer %v\n", err, self.id)
self.blockPool.RemovePeer(self.id)
}
}

View File

@ -21,12 +21,13 @@ func TestBlockPoolConfig(t *testing.T) {
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, blockHashesTimeout, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, idleBestPeerTimeout, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, peerSuspensionInterval, t)
}
func TestBlockPoolOverrideConfig(t *testing.T) {
test.LogInit()
blockPool := &BlockPool{Config: &Config{}}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second}
c := &Config{128, 32, 1, 0, 300 * time.Millisecond, 100 * time.Millisecond, 90 * time.Second, 0, 30 * time.Second, 30 * time.Second}
blockPool.Config = c
blockPool.Start()
@ -39,4 +40,5 @@ func TestBlockPoolOverrideConfig(t *testing.T) {
test.CheckDuration("BlockHashesTimeout", c.BlockHashesTimeout, 90*time.Second, t)
test.CheckDuration("BlocksTimeout", c.BlocksTimeout, blocksTimeout, t)
test.CheckDuration("IdleBestPeerTimeout", c.IdleBestPeerTimeout, 30*time.Second, t)
test.CheckDuration("PeerSuspensionInterval", c.PeerSuspensionInterval, 30*time.Second, t)
}

View File

@ -5,6 +5,7 @@ import (
"time"
"github.com/ethereum/go-ethereum/blockpool/test"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/pow"
)
@ -45,7 +46,7 @@ func TestVerifyPoW(t *testing.T) {
first := false
blockPoolTester.blockPool.verifyPoW = func(b pow.Block) bool {
bb, _ := b.(*types.Block)
indexes := blockPoolTester.hashPool.HashesToIndexes([][]byte{bb.Hash()})
indexes := blockPoolTester.hashPool.HashesToIndexes([]common.Hash{bb.Hash()})
if indexes[0] == 2 && !first {
first = true
return false
@ -122,3 +123,33 @@ func TestErrInsufficientChainInfo(t *testing.T) {
t.Errorf("expected %v error, got %v", ErrInsufficientChainInfo, peer1.peerErrors)
}
}
func TestPeerSuspension(t *testing.T) {
test.LogInit()
_, blockPool, blockPoolTester := newTestBlockPool(t)
blockPool.Config.PeerSuspensionInterval = 100 * time.Millisecond
blockPool.Start()
peer1 := blockPoolTester.newPeer("peer1", 1, 3)
peer1.AddPeer()
blockPool.peers.peerError("peer1", 0, "")
bestpeer, _ := blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed on error")
}
peer1.AddPeer()
bestpeer, _ = blockPool.peers.getPeer("peer1")
if bestpeer != nil {
t.Errorf("peer1 not removed on reconnect")
}
time.Sleep(100 * time.Millisecond)
peer1.AddPeer()
bestpeer, _ = blockPool.peers.getPeer("peer1")
if bestpeer == nil {
t.Errorf("peer1 not connected after PeerSuspensionInterval")
}
// blockPool.Wait(waitTimeout)
blockPool.Stop()
}

View File

@ -47,6 +47,8 @@ type peer struct {
blocksRequestTimer <-chan time.Time
suicideC <-chan time.Time
addToBlacklist func(id string)
idle bool
}
@ -55,11 +57,12 @@ type peer struct {
type peers struct {
lock sync.RWMutex
bp *BlockPool
errors *errs.Errors
peers map[string]*peer
best *peer
status *status
bp *BlockPool
errors *errs.Errors
peers map[string]*peer
best *peer
status *status
blacklist map[string]time.Time
}
// peer constructor
@ -84,26 +87,46 @@ func (self *peers) newPeer(
headSectionC: make(chan *section),
bp: self.bp,
idle: true,
addToBlacklist: self.addToBlacklist,
}
// at creation the peer is recorded in the peer pool
self.peers[id] = p
return
}
// dispatches an error to a peer if still connected
// dispatches an error to a peer if still connected, adds it to the blacklist
func (self *peers) peerError(id string, code int, format string, params ...interface{}) {
self.lock.RLock()
defer self.lock.RUnlock()
peer, ok := self.peers[id]
self.lock.RUnlock()
if ok {
peer.addError(code, format, params)
}
// blacklisting comes here
self.addToBlacklist(id)
}
func (self *peers) addToBlacklist(id string) {
self.lock.Lock()
defer self.lock.Unlock()
self.blacklist[id] = time.Now()
}
func (self *peers) suspended(id string) (s bool) {
self.lock.Lock()
defer self.lock.Unlock()
if suspendedAt, ok := self.blacklist[id]; ok {
if s = suspendedAt.Add(self.bp.Config.PeerSuspensionInterval).After(time.Now()); !s {
// no longer suspended, delete entry
delete(self.blacklist, id)
}
}
return
}
func (self *peer) addError(code int, format string, params ...interface{}) {
err := self.errors.New(code, format, params...)
self.peerError(err)
self.addToBlacklist(self.id)
}
func (self *peer) setChainInfo(td *big.Int, c common.Hash) {
@ -182,9 +205,13 @@ func (self *peers) addPeer(
requestBlockHashes func(common.Hash) error,
requestBlocks func([]common.Hash) error,
peerError func(*errs.Error),
) (best bool) {
) (best bool, suspended bool) {
var previousBlockHash common.Hash
if self.suspended(id) {
suspended = true
return
}
self.lock.Lock()
p, found := self.peers[id]
if found {
@ -213,7 +240,7 @@ func (self *peers) addPeer(
if self.bp.hasBlock(currentBlockHash) {
// peer not ahead
plog.Debugf("addPeer: peer <%v> with td %v and current block %s is behind", id, td, hex(currentBlockHash))
return false
return false, false
}
if self.best == p {
@ -248,8 +275,10 @@ func (self *peers) addPeer(
// removePeer is called (via RemovePeer) by the eth protocol when the peer disconnects
func (self *peers) removePeer(id string) {
plog.Debugf("addPeer: remove peer 0 <%v>", id)
self.lock.Lock()
defer self.lock.Unlock()
plog.Debugf("addPeer: remove peer 1 <%v>", id)
p, found := self.peers[id]
if !found {

View File

@ -3,6 +3,7 @@ package test
import (
"sync"
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
)
@ -13,9 +14,9 @@ func NewHashPool() *TestHashPool {
return &TestHashPool{intToHash: make(intToHash), hashToInt: make(hashToInt)}
}
type intToHash map[int][]byte
type intToHash map[int]common.Hash
type hashToInt map[string]int
type hashToInt map[common.Hash]int
// hashPool is a test helper, that allows random hashes to be referred to by integers
type TestHashPool struct {
@ -24,11 +25,11 @@ type TestHashPool struct {
lock sync.Mutex
}
func newHash(i int) []byte {
return crypto.Sha3([]byte(string(i)))
func newHash(i int) common.Hash {
return common.BytesToHash(crypto.Sha3([]byte(string(i))))
}
func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) {
func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes []common.Hash) {
self.lock.Lock()
defer self.lock.Unlock()
for _, i := range indexes {
@ -36,18 +37,18 @@ func (self *TestHashPool) IndexesToHashes(indexes []int) (hashes [][]byte) {
if !found {
hash = newHash(i)
self.intToHash[i] = hash
self.hashToInt[string(hash)] = i
self.hashToInt[hash] = i
}
hashes = append(hashes, hash)
}
return
}
func (self *TestHashPool) HashesToIndexes(hashes [][]byte) (indexes []int) {
func (self *TestHashPool) HashesToIndexes(hashes []common.Hash) (indexes []int) {
self.lock.Lock()
defer self.lock.Unlock()
for _, hash := range hashes {
i, found := self.hashToInt[string(hash)]
i, found := self.hashToInt[hash]
if !found {
i = -1
}

View File

@ -42,6 +42,7 @@ const (
ErrGenesisBlockMismatch
ErrNoStatusMsg
ErrExtraStatusMsg
ErrSuspendedPeer
)
var errorToString = map[int]string{
@ -53,6 +54,7 @@ var errorToString = map[int]string{
ErrGenesisBlockMismatch: "Genesis block mismatch",
ErrNoStatusMsg: "No status message",
ErrExtraStatusMsg: "Extra status message",
ErrSuspendedPeer: "Suspended peer",
}
// ethProtocol represents the ethereum wire protocol
@ -85,7 +87,7 @@ type chainManager interface {
type blockPool interface {
AddBlockHashes(next func() (common.Hash, bool), peerId string)
AddBlock(block *types.Block, peerId string)
AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool)
AddPeer(td *big.Int, currentBlock common.Hash, peerId string, requestHashes func(common.Hash) error, requestBlocks func([]common.Hash) error, peerError func(*errs.Error)) (best bool, suspended bool)
RemovePeer(peerId string)
}
@ -288,7 +290,7 @@ func (self *ethProtocol) handle() error {
// to simplify backend interface adding a new block
// uses AddPeer followed by AddBlock only if peer is the best peer
// (or selected as new best peer)
if self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect) {
if best, _ := self.blockPool.AddPeer(request.TD, hash, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect); best {
self.blockPool.AddBlock(request.Block, self.id)
}
@ -334,9 +336,12 @@ func (self *ethProtocol) handleStatus() error {
return self.protoError(ErrProtocolVersionMismatch, "%d (!= %d)", status.ProtocolVersion, self.protocolVersion)
}
self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
_, suspended := self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
if suspended {
return self.protoError(ErrSuspendedPeer, "")
}
self.blockPool.AddPeer(status.TD, status.CurrentBlock, self.id, self.requestBlockHashes, self.requestBlocks, self.protoErrorDisconnect)
self.peer.Infof("Peer is [eth] capable (%d/%d). TD=%v H=%x\n", status.ProtocolVersion, status.NetworkId, status.TD, status.CurrentBlock[:4])
return nil
}