forked from cerc-io/plugeth
Merge pull request #988 from karalabe/fix-downloader-vulnerabilities
Fix downloader vulnerabilities
This commit is contained in:
commit
7d71a75d77
@ -2,7 +2,7 @@ package downloader
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"fmt"
|
"math/rand"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
@ -15,29 +15,34 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
maxBlockFetch = 128 // Amount of max blocks to be fetched per chunk
|
maxHashFetch = 512 // Amount of hashes to be fetched per chunk
|
||||||
|
maxBlockFetch = 128 // Amount of blocks to be fetched per chunk
|
||||||
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
|
peerCountTimeout = 12 * time.Second // Amount of time it takes for the peer handler to ignore minDesiredPeerCount
|
||||||
hashTtl = 20 * time.Second // The amount of time it takes for a hash request to time out
|
hashTTL = 5 * time.Second // Time it takes for a hash request to time out
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
blockTTL = 5 * time.Second // Time it takes for a block request to time out
|
||||||
blockTtl = 20 * time.Second // The amount of time it takes for a block request to time out
|
crossCheckCycle = time.Second // Period after which to check for expired cross checks
|
||||||
|
minDesiredPeerCount = 5 // Amount of peers desired to start syncing
|
||||||
|
)
|
||||||
|
|
||||||
errLowTd = errors.New("peer's TD is too low")
|
var (
|
||||||
ErrBusy = errors.New("busy")
|
errLowTd = errors.New("peer's TD is too low")
|
||||||
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
ErrBusy = errors.New("busy")
|
||||||
errBadPeer = errors.New("action from bad peer ignored")
|
errUnknownPeer = errors.New("peer's unknown or unhealthy")
|
||||||
errNoPeers = errors.New("no peers to keep download active")
|
ErrBadPeer = errors.New("action from bad peer ignored")
|
||||||
ErrPendingQueue = errors.New("pending items in queue")
|
errNoPeers = errors.New("no peers to keep download active")
|
||||||
ErrTimeout = errors.New("timeout")
|
ErrPendingQueue = errors.New("pending items in queue")
|
||||||
errEmptyHashSet = errors.New("empty hash set by peer")
|
ErrTimeout = errors.New("timeout")
|
||||||
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
errEmptyHashSet = errors.New("empty hash set by peer")
|
||||||
errAlreadyInPool = errors.New("hash already in pool")
|
errPeersUnavailable = errors.New("no peers available or all peers tried for block download process")
|
||||||
errBlockNumberOverflow = errors.New("received block which overflows")
|
errAlreadyInPool = errors.New("hash already in pool")
|
||||||
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
|
ErrInvalidChain = errors.New("retrieved hash chain is invalid")
|
||||||
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
|
ErrCrossCheckFailed = errors.New("block cross-check failed")
|
||||||
errNoSyncActive = errors.New("no sync active")
|
errCancelHashFetch = errors.New("hash fetching cancelled (requested)")
|
||||||
|
errCancelBlockFetch = errors.New("block downloading cancelled (requested)")
|
||||||
|
errNoSyncActive = errors.New("no sync active")
|
||||||
)
|
)
|
||||||
|
|
||||||
type hashCheckFn func(common.Hash) bool
|
type hashCheckFn func(common.Hash) bool
|
||||||
@ -58,9 +63,10 @@ type hashPack struct {
|
|||||||
type Downloader struct {
|
type Downloader struct {
|
||||||
mux *event.TypeMux
|
mux *event.TypeMux
|
||||||
|
|
||||||
mu sync.RWMutex
|
mu sync.RWMutex
|
||||||
queue *queue
|
queue *queue // Scheduler for selecting the hashes to download
|
||||||
peers *peerSet
|
peers *peerSet // Set of active peers from which download can proceed
|
||||||
|
checks map[common.Hash]time.Time // Pending cross checks to verify a hash chain
|
||||||
|
|
||||||
// Callbacks
|
// Callbacks
|
||||||
hasBlock hashCheckFn
|
hasBlock hashCheckFn
|
||||||
@ -153,6 +159,7 @@ func (d *Downloader) Synchronise(id string, hash common.Hash) error {
|
|||||||
// Reset the queue and peer set to clean any internal leftover state
|
// Reset the queue and peer set to clean any internal leftover state
|
||||||
d.queue.Reset()
|
d.queue.Reset()
|
||||||
d.peers.Reset()
|
d.peers.Reset()
|
||||||
|
d.checks = make(map[common.Hash]time.Time)
|
||||||
|
|
||||||
// Retrieve the origin peer and initiate the downloading process
|
// Retrieve the origin peer and initiate the downloading process
|
||||||
p := d.peers.Peer(id)
|
p := d.peers.Peer(id)
|
||||||
@ -177,7 +184,7 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash) (err error) {
|
|||||||
defer func() {
|
defer func() {
|
||||||
// reset on error
|
// reset on error
|
||||||
if err != nil {
|
if err != nil {
|
||||||
d.queue.Reset()
|
d.Cancel()
|
||||||
d.mux.Post(FailedEvent{err})
|
d.mux.Post(FailedEvent{err})
|
||||||
} else {
|
} else {
|
||||||
d.mux.Post(DoneEvent{})
|
d.mux.Post(DoneEvent{})
|
||||||
@ -221,66 +228,98 @@ func (d *Downloader) fetchHashes(p *peer, h common.Hash) error {
|
|||||||
|
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
|
||||||
// Add the hash to the queue first
|
// Add the hash to the queue first, and start hash retrieval
|
||||||
d.queue.Insert([]common.Hash{h})
|
d.queue.Insert([]common.Hash{h})
|
||||||
|
|
||||||
// Get the first batch of hashes
|
|
||||||
p.getHashes(h)
|
p.getHashes(h)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
failureResponseTimer = time.NewTimer(hashTtl)
|
active = p // active peer will help determine the current active peer
|
||||||
attemptedPeers = make(map[string]bool) // attempted peers will help with retries
|
head = common.Hash{} // common and last hash
|
||||||
activePeer = p // active peer will help determine the current active peer
|
|
||||||
hash common.Hash // common and last hash
|
|
||||||
)
|
|
||||||
attemptedPeers[p.id] = true
|
|
||||||
|
|
||||||
out:
|
timeout = time.NewTimer(hashTTL) // timer to dump a non-responsive active peer
|
||||||
for {
|
attempted = make(map[string]bool) // attempted peers will help with retries
|
||||||
|
crossTicker = time.NewTicker(crossCheckCycle) // ticker to periodically check expired cross checks
|
||||||
|
)
|
||||||
|
defer crossTicker.Stop()
|
||||||
|
|
||||||
|
attempted[p.id] = true
|
||||||
|
for finished := false; !finished; {
|
||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
return errCancelHashFetch
|
return errCancelHashFetch
|
||||||
|
|
||||||
case hashPack := <-d.hashCh:
|
case hashPack := <-d.hashCh:
|
||||||
// Make sure the active peer is giving us the hashes
|
// Make sure the active peer is giving us the hashes
|
||||||
if hashPack.peerId != activePeer.id {
|
if hashPack.peerId != active.id {
|
||||||
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
|
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)\n", hashPack.peerId)
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
timeout.Reset(hashTTL)
|
||||||
failureResponseTimer.Reset(hashTtl)
|
|
||||||
|
|
||||||
// Make sure the peer actually gave something valid
|
// Make sure the peer actually gave something valid
|
||||||
if len(hashPack.hashes) == 0 {
|
if len(hashPack.hashes) == 0 {
|
||||||
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", activePeer.id)
|
glog.V(logger.Debug).Infof("Peer (%s) responded with empty hash set\n", active.id)
|
||||||
d.queue.Reset()
|
|
||||||
|
|
||||||
return errEmptyHashSet
|
return errEmptyHashSet
|
||||||
}
|
}
|
||||||
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
|
// Determine if we're done fetching hashes (queue up all pending), and continue if not done
|
||||||
done, index := false, 0
|
done, index := false, 0
|
||||||
for index, hash = range hashPack.hashes {
|
for index, head = range hashPack.hashes {
|
||||||
if d.hasBlock(hash) || d.queue.GetBlock(hash) != nil {
|
if d.hasBlock(head) || d.queue.GetBlock(head) != nil {
|
||||||
glog.V(logger.Debug).Infof("Found common hash %x\n", hash[:4])
|
glog.V(logger.Debug).Infof("Found common hash %x\n", head[:4])
|
||||||
hashPack.hashes = hashPack.hashes[:index]
|
hashPack.hashes = hashPack.hashes[:index]
|
||||||
done = true
|
done = true
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
d.queue.Insert(hashPack.hashes)
|
// Insert all the new hashes, but only continue if got something useful
|
||||||
|
inserts := d.queue.Insert(hashPack.hashes)
|
||||||
|
if len(inserts) == 0 && !done {
|
||||||
|
glog.V(logger.Debug).Infof("Peer (%s) responded with stale hashes\n", active.id)
|
||||||
|
return ErrBadPeer
|
||||||
|
}
|
||||||
if !done {
|
if !done {
|
||||||
activePeer.getHashes(hash)
|
// Try and fetch a random block to verify the hash batch
|
||||||
|
cross := inserts[rand.Intn(len(inserts))]
|
||||||
|
glog.V(logger.Detail).Infof("Cross checking (%s) with %x", active.id, cross)
|
||||||
|
|
||||||
|
d.checks[cross] = time.Now().Add(blockTTL)
|
||||||
|
active.getBlocks([]common.Hash{cross})
|
||||||
|
|
||||||
|
// Also fetch a fresh
|
||||||
|
active.getHashes(head)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
// We're done, allocate the download cache and proceed pulling the blocks
|
// We're done, allocate the download cache and proceed pulling the blocks
|
||||||
offset := 0
|
offset := 0
|
||||||
if block := d.getBlock(hash); block != nil {
|
if block := d.getBlock(head); block != nil {
|
||||||
offset = int(block.NumberU64() + 1)
|
offset = int(block.NumberU64() + 1)
|
||||||
}
|
}
|
||||||
d.queue.Alloc(offset)
|
d.queue.Alloc(offset)
|
||||||
break out
|
finished = true
|
||||||
|
|
||||||
case <-failureResponseTimer.C:
|
case blockPack := <-d.blockCh:
|
||||||
|
// Cross check the block with the random verifications
|
||||||
|
if blockPack.peerId != active.id || len(blockPack.blocks) != 1 {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
block := blockPack.blocks[0]
|
||||||
|
if _, ok := d.checks[block.Hash()]; ok {
|
||||||
|
if !d.queue.Has(block.ParentHash()) {
|
||||||
|
return ErrCrossCheckFailed
|
||||||
|
}
|
||||||
|
delete(d.checks, block.Hash())
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-crossTicker.C:
|
||||||
|
// Iterate over all the cross checks and fail the hash chain if they're not verified
|
||||||
|
for hash, deadline := range d.checks {
|
||||||
|
if time.Now().After(deadline) {
|
||||||
|
glog.V(logger.Debug).Infof("Cross check timeout for %x", hash)
|
||||||
|
return ErrCrossCheckFailed
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
case <-timeout.C:
|
||||||
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
glog.V(logger.Debug).Infof("Peer (%s) didn't respond in time for hash request\n", p.id)
|
||||||
|
|
||||||
var p *peer // p will be set if a peer can be found
|
var p *peer // p will be set if a peer can be found
|
||||||
@ -288,21 +327,20 @@ out:
|
|||||||
// already fetched hash list. This can't guarantee 100% correctness but does
|
// already fetched hash list. This can't guarantee 100% correctness but does
|
||||||
// a fair job. This is always either correct or false incorrect.
|
// a fair job. This is always either correct or false incorrect.
|
||||||
for _, peer := range d.peers.AllPeers() {
|
for _, peer := range d.peers.AllPeers() {
|
||||||
if d.queue.Has(peer.head) && !attemptedPeers[peer.id] {
|
if d.queue.Has(peer.head) && !attempted[peer.id] {
|
||||||
p = peer
|
p = peer
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
// if all peers have been tried, abort the process entirely or if the hash is
|
// if all peers have been tried, abort the process entirely or if the hash is
|
||||||
// the zero hash.
|
// the zero hash.
|
||||||
if p == nil || (hash == common.Hash{}) {
|
if p == nil || (head == common.Hash{}) {
|
||||||
d.queue.Reset()
|
|
||||||
return ErrTimeout
|
return ErrTimeout
|
||||||
}
|
}
|
||||||
// set p to the active peer. this will invalidate any hashes that may be returned
|
// set p to the active peer. this will invalidate any hashes that may be returned
|
||||||
// by our previous (delayed) peer.
|
// by our previous (delayed) peer.
|
||||||
activePeer = p
|
active = p
|
||||||
p.getHashes(hash)
|
p.getHashes(head)
|
||||||
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
|
glog.V(logger.Debug).Infof("Hash fetching switched to new peer(%s)\n", p.id)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -325,12 +363,26 @@ out:
|
|||||||
select {
|
select {
|
||||||
case <-d.cancelCh:
|
case <-d.cancelCh:
|
||||||
return errCancelBlockFetch
|
return errCancelBlockFetch
|
||||||
|
|
||||||
case blockPack := <-d.blockCh:
|
case blockPack := <-d.blockCh:
|
||||||
|
// Short circuit if it's a stale cross check
|
||||||
|
if len(blockPack.blocks) == 1 {
|
||||||
|
block := blockPack.blocks[0]
|
||||||
|
if _, ok := d.checks[block.Hash()]; ok {
|
||||||
|
delete(d.checks, block.Hash())
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
// If the peer was previously banned and failed to deliver it's pack
|
// If the peer was previously banned and failed to deliver it's pack
|
||||||
// in a reasonable time frame, ignore it's message.
|
// in a reasonable time frame, ignore it's message.
|
||||||
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
|
if peer := d.peers.Peer(blockPack.peerId); peer != nil {
|
||||||
// Deliver the received chunk of blocks, but drop the peer if invalid
|
// Deliver the received chunk of blocks
|
||||||
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
|
if err := d.queue.Deliver(blockPack.peerId, blockPack.blocks); err != nil {
|
||||||
|
if err == ErrInvalidChain {
|
||||||
|
// The hash chain is invalid (blocks are not ordered properly), abort
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
// Peer did deliver, but some blocks were off, penalize
|
||||||
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
|
glog.V(logger.Debug).Infof("Failed delivery for peer %s: %v\n", blockPack.peerId, err)
|
||||||
peer.Demote()
|
peer.Demote()
|
||||||
break
|
break
|
||||||
@ -348,7 +400,7 @@ out:
|
|||||||
// that badly or poorly behave are removed from the peer set (not banned).
|
// that badly or poorly behave are removed from the peer set (not banned).
|
||||||
// Bad peers are excluded from the available peer set and therefor won't be
|
// Bad peers are excluded from the available peer set and therefor won't be
|
||||||
// reused. XXX We could re-introduce peers after X time.
|
// reused. XXX We could re-introduce peers after X time.
|
||||||
badPeers := d.queue.Expire(blockTtl)
|
badPeers := d.queue.Expire(blockTTL)
|
||||||
for _, pid := range badPeers {
|
for _, pid := range badPeers {
|
||||||
// XXX We could make use of a reputation system here ranking peers
|
// XXX We could make use of a reputation system here ranking peers
|
||||||
// in their performance
|
// in their performance
|
||||||
@ -361,7 +413,6 @@ out:
|
|||||||
}
|
}
|
||||||
// After removing bad peers make sure we actually have sufficient peer left to keep downloading
|
// After removing bad peers make sure we actually have sufficient peer left to keep downloading
|
||||||
if d.peers.Len() == 0 {
|
if d.peers.Len() == 0 {
|
||||||
d.queue.Reset()
|
|
||||||
return errNoPeers
|
return errNoPeers
|
||||||
}
|
}
|
||||||
// If there are unrequested hashes left start fetching
|
// If there are unrequested hashes left start fetching
|
||||||
@ -395,9 +446,7 @@ out:
|
|||||||
// Make sure that we have peers available for fetching. If all peers have been tried
|
// Make sure that we have peers available for fetching. If all peers have been tried
|
||||||
// and all failed throw an error
|
// and all failed throw an error
|
||||||
if d.queue.InFlight() == 0 {
|
if d.queue.InFlight() == 0 {
|
||||||
d.queue.Reset()
|
return errPeersUnavailable
|
||||||
|
|
||||||
return fmt.Errorf("%v peers available = %d. total peers = %d. hashes needed = %d", errPeersUnavailable, len(idlePeers), d.peers.Len(), d.queue.Pending())
|
|
||||||
}
|
}
|
||||||
|
|
||||||
} else if d.queue.InFlight() == 0 {
|
} else if d.queue.InFlight() == 0 {
|
||||||
|
@ -23,25 +23,26 @@ func createHashes(start, amount int) (hashes []common.Hash) {
|
|||||||
for i := range hashes[:len(hashes)-1] {
|
for i := range hashes[:len(hashes)-1] {
|
||||||
binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2))
|
binary.BigEndian.PutUint64(hashes[i][:8], uint64(i+2))
|
||||||
}
|
}
|
||||||
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBlock(i int, prevHash, hash common.Hash) *types.Block {
|
func createBlock(i int, parent, hash common.Hash) *types.Block {
|
||||||
header := &types.Header{Number: big.NewInt(int64(i))}
|
header := &types.Header{Number: big.NewInt(int64(i))}
|
||||||
block := types.NewBlockWithHeader(header)
|
block := types.NewBlockWithHeader(header)
|
||||||
block.HeaderHash = hash
|
block.HeaderHash = hash
|
||||||
block.ParentHeaderHash = prevHash
|
block.ParentHeaderHash = parent
|
||||||
return block
|
return block
|
||||||
}
|
}
|
||||||
|
|
||||||
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
|
func createBlocksFromHashes(hashes []common.Hash) map[common.Hash]*types.Block {
|
||||||
blocks := make(map[common.Hash]*types.Block)
|
blocks := make(map[common.Hash]*types.Block)
|
||||||
|
for i := 0; i < len(hashes); i++ {
|
||||||
for i, hash := range hashes {
|
parent := knownHash
|
||||||
blocks[hash] = createBlock(len(hashes)-i, knownHash, hash)
|
if i < len(hashes)-1 {
|
||||||
|
parent = hashes[i+1]
|
||||||
|
}
|
||||||
|
blocks[hashes[i]] = createBlock(len(hashes)-i, parent, hashes[i])
|
||||||
}
|
}
|
||||||
|
|
||||||
return blocks
|
return blocks
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -75,9 +76,40 @@ func newTester(t *testing.T, hashes []common.Hash, blocks map[common.Hash]*types
|
|||||||
return tester
|
return tester
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) sync(peerId string, hash common.Hash) error {
|
// sync is a simple wrapper around the downloader to start synchronisation and
|
||||||
|
// block until it returns
|
||||||
|
func (dl *downloadTester) sync(peerId string, head common.Hash) error {
|
||||||
dl.activePeerId = peerId
|
dl.activePeerId = peerId
|
||||||
return dl.downloader.Synchronise(peerId, hash)
|
return dl.downloader.Synchronise(peerId, head)
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncTake is starts synchronising with a remote peer, but concurrently it also
|
||||||
|
// starts fetching blocks that the downloader retrieved. IT blocks until both go
|
||||||
|
// routines terminate.
|
||||||
|
func (dl *downloadTester) syncTake(peerId string, head common.Hash) (types.Blocks, error) {
|
||||||
|
// Start a block collector to take blocks as they become available
|
||||||
|
done := make(chan struct{})
|
||||||
|
took := []*types.Block{}
|
||||||
|
go func() {
|
||||||
|
for running := true; running; {
|
||||||
|
select {
|
||||||
|
case <-done:
|
||||||
|
running = false
|
||||||
|
default:
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
}
|
||||||
|
// Take a batch of blocks and accumulate
|
||||||
|
took = append(took, dl.downloader.TakeBlocks()...)
|
||||||
|
}
|
||||||
|
done <- struct{}{}
|
||||||
|
}()
|
||||||
|
// Start the downloading, sync the taker and return
|
||||||
|
err := dl.sync(peerId, head)
|
||||||
|
|
||||||
|
done <- struct{}{}
|
||||||
|
<-done
|
||||||
|
|
||||||
|
return took, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
|
func (dl *downloadTester) insertBlocks(blocks types.Blocks) {
|
||||||
@ -99,18 +131,37 @@ func (dl *downloadTester) getBlock(hash common.Hash) *types.Block {
|
|||||||
return dl.blocks[knownHash]
|
return dl.blocks[knownHash]
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) getHashes(hash common.Hash) error {
|
// getHashes retrieves a batch of hashes for reconstructing the chain.
|
||||||
dl.downloader.DeliverHashes(dl.activePeerId, dl.hashes)
|
func (dl *downloadTester) getHashes(head common.Hash) error {
|
||||||
|
// Gather the next batch of hashes
|
||||||
|
hashes := make([]common.Hash, 0, maxHashFetch)
|
||||||
|
for i, hash := range dl.hashes {
|
||||||
|
if hash == head {
|
||||||
|
i++
|
||||||
|
for len(hashes) < cap(hashes) && i < len(dl.hashes) {
|
||||||
|
hashes = append(hashes, dl.hashes[i])
|
||||||
|
i++
|
||||||
|
}
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Delay delivery a bit to allow attacks to unfold
|
||||||
|
id := dl.activePeerId
|
||||||
|
go func() {
|
||||||
|
time.Sleep(time.Millisecond)
|
||||||
|
dl.downloader.DeliverHashes(id, hashes)
|
||||||
|
}()
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
|
func (dl *downloadTester) getBlocks(id string) func([]common.Hash) error {
|
||||||
return func(hashes []common.Hash) error {
|
return func(hashes []common.Hash) error {
|
||||||
blocks := make([]*types.Block, len(hashes))
|
blocks := make([]*types.Block, 0, len(hashes))
|
||||||
for i, hash := range hashes {
|
for _, hash := range hashes {
|
||||||
blocks[i] = dl.blocks[hash]
|
if block, ok := dl.blocks[hash]; ok {
|
||||||
|
blocks = append(blocks, block)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
go dl.downloader.DeliverBlocks(id, blocks)
|
go dl.downloader.DeliverBlocks(id, blocks)
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
@ -134,7 +185,7 @@ func (dl *downloadTester) badBlocksPeer(id string, td *big.Int, hash common.Hash
|
|||||||
|
|
||||||
func TestDownload(t *testing.T) {
|
func TestDownload(t *testing.T) {
|
||||||
minDesiredPeerCount = 4
|
minDesiredPeerCount = 4
|
||||||
blockTtl = 1 * time.Second
|
blockTTL = 1 * time.Second
|
||||||
|
|
||||||
targetBlocks := 1000
|
targetBlocks := 1000
|
||||||
hashes := createHashes(0, targetBlocks)
|
hashes := createHashes(0, targetBlocks)
|
||||||
@ -183,7 +234,7 @@ func TestMissing(t *testing.T) {
|
|||||||
|
|
||||||
func TestTaking(t *testing.T) {
|
func TestTaking(t *testing.T) {
|
||||||
minDesiredPeerCount = 4
|
minDesiredPeerCount = 4
|
||||||
blockTtl = 1 * time.Second
|
blockTTL = 1 * time.Second
|
||||||
|
|
||||||
targetBlocks := 1000
|
targetBlocks := 1000
|
||||||
hashes := createHashes(0, targetBlocks)
|
hashes := createHashes(0, targetBlocks)
|
||||||
@ -224,7 +275,7 @@ func TestInactiveDownloader(t *testing.T) {
|
|||||||
|
|
||||||
func TestCancel(t *testing.T) {
|
func TestCancel(t *testing.T) {
|
||||||
minDesiredPeerCount = 4
|
minDesiredPeerCount = 4
|
||||||
blockTtl = 1 * time.Second
|
blockTTL = 1 * time.Second
|
||||||
|
|
||||||
targetBlocks := 1000
|
targetBlocks := 1000
|
||||||
hashes := createHashes(0, targetBlocks)
|
hashes := createHashes(0, targetBlocks)
|
||||||
@ -250,7 +301,7 @@ func TestCancel(t *testing.T) {
|
|||||||
|
|
||||||
func TestThrottling(t *testing.T) {
|
func TestThrottling(t *testing.T) {
|
||||||
minDesiredPeerCount = 4
|
minDesiredPeerCount = 4
|
||||||
blockTtl = 1 * time.Second
|
blockTTL = 1 * time.Second
|
||||||
|
|
||||||
targetBlocks := 16 * blockCacheLimit
|
targetBlocks := 16 * blockCacheLimit
|
||||||
hashes := createHashes(0, targetBlocks)
|
hashes := createHashes(0, targetBlocks)
|
||||||
@ -263,32 +314,7 @@ func TestThrottling(t *testing.T) {
|
|||||||
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
|
tester.badBlocksPeer("peer4", big.NewInt(0), common.Hash{})
|
||||||
|
|
||||||
// Concurrently download and take the blocks
|
// Concurrently download and take the blocks
|
||||||
errc := make(chan error, 1)
|
took, err := tester.syncTake("peer1", hashes[0])
|
||||||
go func() {
|
|
||||||
errc <- tester.sync("peer1", hashes[0])
|
|
||||||
}()
|
|
||||||
|
|
||||||
done := make(chan struct{})
|
|
||||||
took := []*types.Block{}
|
|
||||||
go func() {
|
|
||||||
for running := true; running; {
|
|
||||||
select {
|
|
||||||
case <-done:
|
|
||||||
running = false
|
|
||||||
default:
|
|
||||||
time.Sleep(time.Millisecond)
|
|
||||||
}
|
|
||||||
// Take a batch of blocks and accumulate
|
|
||||||
took = append(took, tester.downloader.TakeBlocks()...)
|
|
||||||
}
|
|
||||||
done <- struct{}{}
|
|
||||||
}()
|
|
||||||
|
|
||||||
// Synchronise the two threads and verify
|
|
||||||
err := <-errc
|
|
||||||
done <- struct{}{}
|
|
||||||
<-done
|
|
||||||
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to synchronise blocks: %v", err)
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
}
|
}
|
||||||
@ -336,3 +362,137 @@ func TestNonExistingParentAttack(t *testing.T) {
|
|||||||
t.Fatalf("tester doesn't know about the origin hash")
|
t.Fatalf("tester doesn't know about the origin hash")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Tests that if a malicious peers keeps sending us repeating hashes, we don't
|
||||||
|
// loop indefinitely.
|
||||||
|
func TestRepeatingHashAttack(t *testing.T) {
|
||||||
|
// Create a valid chain, but drop the last link
|
||||||
|
hashes := createHashes(0, blockCacheLimit)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
forged := hashes[:len(hashes)-1]
|
||||||
|
|
||||||
|
// Try and sync with the malicious node
|
||||||
|
tester := newTester(t, forged, blocks)
|
||||||
|
tester.newPeer("attack", big.NewInt(10000), forged[0])
|
||||||
|
|
||||||
|
errc := make(chan error)
|
||||||
|
go func() {
|
||||||
|
errc <- tester.sync("attack", hashes[0])
|
||||||
|
}()
|
||||||
|
|
||||||
|
// Make sure that syncing returns and does so with a failure
|
||||||
|
select {
|
||||||
|
case <-time.After(100 * time.Millisecond):
|
||||||
|
t.Fatalf("synchronisation blocked")
|
||||||
|
case err := <-errc:
|
||||||
|
if err == nil {
|
||||||
|
t.Fatalf("synchronisation succeeded")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Ensure that a valid chain can still pass sync
|
||||||
|
tester.hashes = hashes
|
||||||
|
tester.newPeer("valid", big.NewInt(20000), hashes[0])
|
||||||
|
if err := tester.sync("valid", hashes[0]); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if a malicious peers returns a non-existent block hash, it should
|
||||||
|
// eventually time out and the sync reattempted.
|
||||||
|
func TestNonExistingBlockAttack(t *testing.T) {
|
||||||
|
// Create a valid chain, but forge the last link
|
||||||
|
hashes := createHashes(0, blockCacheLimit)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
origin := hashes[len(hashes)/2]
|
||||||
|
|
||||||
|
hashes[len(hashes)/2] = unknownHash
|
||||||
|
|
||||||
|
// Try and sync with the malicious node and check that it fails
|
||||||
|
tester := newTester(t, hashes, blocks)
|
||||||
|
tester.newPeer("attack", big.NewInt(10000), hashes[0])
|
||||||
|
if err := tester.sync("attack", hashes[0]); err != errPeersUnavailable {
|
||||||
|
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, errPeersUnavailable)
|
||||||
|
}
|
||||||
|
// Ensure that a valid chain can still pass sync
|
||||||
|
hashes[len(hashes)/2] = origin
|
||||||
|
tester.newPeer("valid", big.NewInt(20000), hashes[0])
|
||||||
|
if err := tester.sync("valid", hashes[0]); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if a malicious peer is returning hashes in a weird order, that the
|
||||||
|
// sync throttler doesn't choke on them waiting for the valid blocks.
|
||||||
|
func TestInvalidHashOrderAttack(t *testing.T) {
|
||||||
|
// Create a valid long chain, but reverse some hashes within
|
||||||
|
hashes := createHashes(0, 4*blockCacheLimit)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
|
||||||
|
chunk1 := make([]common.Hash, blockCacheLimit)
|
||||||
|
chunk2 := make([]common.Hash, blockCacheLimit)
|
||||||
|
copy(chunk1, hashes[blockCacheLimit:2*blockCacheLimit])
|
||||||
|
copy(chunk2, hashes[2*blockCacheLimit:3*blockCacheLimit])
|
||||||
|
|
||||||
|
reverse := make([]common.Hash, len(hashes))
|
||||||
|
copy(reverse, hashes)
|
||||||
|
copy(reverse[2*blockCacheLimit:], chunk1)
|
||||||
|
copy(reverse[blockCacheLimit:], chunk2)
|
||||||
|
|
||||||
|
// Try and sync with the malicious node and check that it fails
|
||||||
|
tester := newTester(t, reverse, blocks)
|
||||||
|
tester.newPeer("attack", big.NewInt(10000), reverse[0])
|
||||||
|
if _, err := tester.syncTake("attack", reverse[0]); err != ErrInvalidChain {
|
||||||
|
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrInvalidChain)
|
||||||
|
}
|
||||||
|
// Ensure that a valid chain can still pass sync
|
||||||
|
tester.hashes = hashes
|
||||||
|
tester.newPeer("valid", big.NewInt(20000), hashes[0])
|
||||||
|
if _, err := tester.syncTake("valid", hashes[0]); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if a malicious peer makes up a random hash chain and tries to push
|
||||||
|
// indefinitely, it actually gets caught with it.
|
||||||
|
func TestMadeupHashChainAttack(t *testing.T) {
|
||||||
|
blockTTL = 100 * time.Millisecond
|
||||||
|
crossCheckCycle = 25 * time.Millisecond
|
||||||
|
|
||||||
|
// Create a long chain of hashes without backing blocks
|
||||||
|
hashes := createHashes(0, 1024*blockCacheLimit)
|
||||||
|
|
||||||
|
// Try and sync with the malicious node and check that it fails
|
||||||
|
tester := newTester(t, hashes, nil)
|
||||||
|
tester.newPeer("attack", big.NewInt(10000), hashes[0])
|
||||||
|
if _, err := tester.syncTake("attack", hashes[0]); err != ErrCrossCheckFailed {
|
||||||
|
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tests that if a malicious peer makes up a random block chain, and tried to
|
||||||
|
// push indefinitely, it actually gets caught with it.
|
||||||
|
func TestMadeupBlockChainAttack(t *testing.T) {
|
||||||
|
blockTTL = 100 * time.Millisecond
|
||||||
|
crossCheckCycle = 25 * time.Millisecond
|
||||||
|
|
||||||
|
// Create a long chain of blocks and simulate an invalid chain by dropping every second
|
||||||
|
hashes := createHashes(0, 32*blockCacheLimit)
|
||||||
|
blocks := createBlocksFromHashes(hashes)
|
||||||
|
|
||||||
|
gapped := make([]common.Hash, len(hashes)/2)
|
||||||
|
for i := 0; i < len(gapped); i++ {
|
||||||
|
gapped[i] = hashes[2*i]
|
||||||
|
}
|
||||||
|
// Try and sync with the malicious node and check that it fails
|
||||||
|
tester := newTester(t, gapped, blocks)
|
||||||
|
tester.newPeer("attack", big.NewInt(10000), gapped[0])
|
||||||
|
if _, err := tester.syncTake("attack", gapped[0]); err != ErrCrossCheckFailed {
|
||||||
|
t.Fatalf("synchronisation error mismatch: have %v, want %v", err, ErrCrossCheckFailed)
|
||||||
|
}
|
||||||
|
// Ensure that a valid chain can still pass sync
|
||||||
|
tester.hashes = hashes
|
||||||
|
tester.newPeer("valid", big.NewInt(20000), hashes[0])
|
||||||
|
if _, err := tester.syncTake("valid", hashes[0]); err != nil {
|
||||||
|
t.Fatalf("failed to synchronise blocks: %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -122,24 +122,28 @@ func (q *queue) Has(hash common.Hash) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// Insert adds a set of hashes for the download queue for scheduling.
|
// Insert adds a set of hashes for the download queue for scheduling, returning
|
||||||
func (q *queue) Insert(hashes []common.Hash) {
|
// the new hashes encountered.
|
||||||
|
func (q *queue) Insert(hashes []common.Hash) []common.Hash {
|
||||||
q.lock.Lock()
|
q.lock.Lock()
|
||||||
defer q.lock.Unlock()
|
defer q.lock.Unlock()
|
||||||
|
|
||||||
// Insert all the hashes prioritized in the arrival order
|
// Insert all the hashes prioritized in the arrival order
|
||||||
for i, hash := range hashes {
|
inserts := make([]common.Hash, 0, len(hashes))
|
||||||
index := q.hashCounter + i
|
for _, hash := range hashes {
|
||||||
|
// Skip anything we already have
|
||||||
if old, ok := q.hashPool[hash]; ok {
|
if old, ok := q.hashPool[hash]; ok {
|
||||||
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
|
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
q.hashPool[hash] = index
|
// Update the counters and insert the hash
|
||||||
q.hashQueue.Push(hash, float32(index)) // Highest gets schedules first
|
q.hashCounter = q.hashCounter + 1
|
||||||
|
inserts = append(inserts, hash)
|
||||||
|
|
||||||
|
q.hashPool[hash] = q.hashCounter
|
||||||
|
q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
|
||||||
}
|
}
|
||||||
// Update the hash counter for the next batch of inserts
|
return inserts
|
||||||
q.hashCounter += len(hashes)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
|
// GetHeadBlock retrieves the first block from the cache, or nil if it hasn't
|
||||||
@ -296,18 +300,17 @@ func (q *queue) Deliver(id string, blocks []*types.Block) (err error) {
|
|||||||
// Iterate over the downloaded blocks and add each of them
|
// Iterate over the downloaded blocks and add each of them
|
||||||
errs := make([]error, 0)
|
errs := make([]error, 0)
|
||||||
for _, block := range blocks {
|
for _, block := range blocks {
|
||||||
// Skip any blocks that fall outside the cache range
|
|
||||||
index := int(block.NumberU64()) - q.blockOffset
|
|
||||||
if index >= len(q.blockCache) || index < 0 {
|
|
||||||
//fmt.Printf("block cache overflown (N=%v O=%v, C=%v)", block.Number(), q.blockOffset, len(q.blockCache))
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
// Skip any blocks that were not requested
|
// Skip any blocks that were not requested
|
||||||
hash := block.Hash()
|
hash := block.Hash()
|
||||||
if _, ok := request.Hashes[hash]; !ok {
|
if _, ok := request.Hashes[hash]; !ok {
|
||||||
errs = append(errs, fmt.Errorf("non-requested block %v", hash))
|
errs = append(errs, fmt.Errorf("non-requested block %v", hash))
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
// If a requested block falls out of the range, the hash chain is invalid
|
||||||
|
index := int(block.NumberU64()) - q.blockOffset
|
||||||
|
if index >= len(q.blockCache) || index < 0 {
|
||||||
|
return ErrInvalidChain
|
||||||
|
}
|
||||||
// Otherwise merge the block and mark the hash block
|
// Otherwise merge the block and mark the hash block
|
||||||
q.blockCache[index] = block
|
q.blockCache[index] = block
|
||||||
|
|
||||||
|
@ -101,11 +101,13 @@ func (pm *ProtocolManager) synchronise(peer *peer) {
|
|||||||
case downloader.ErrBusy:
|
case downloader.ErrBusy:
|
||||||
glog.V(logger.Debug).Infof("Synchronisation already in progress")
|
glog.V(logger.Debug).Infof("Synchronisation already in progress")
|
||||||
|
|
||||||
case downloader.ErrTimeout:
|
case downloader.ErrTimeout, downloader.ErrBadPeer, downloader.ErrInvalidChain, downloader.ErrCrossCheckFailed:
|
||||||
glog.V(logger.Debug).Infof("Removing peer %v due to sync timeout", peer.id)
|
glog.V(logger.Debug).Infof("Removing peer %v: %v", peer.id, err)
|
||||||
pm.removePeer(peer)
|
pm.removePeer(peer)
|
||||||
|
|
||||||
case downloader.ErrPendingQueue:
|
case downloader.ErrPendingQueue:
|
||||||
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
|
glog.V(logger.Debug).Infoln("Synchronisation aborted:", err)
|
||||||
|
|
||||||
default:
|
default:
|
||||||
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
|
glog.V(logger.Warn).Infof("Synchronisation failed: %v", err)
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user