eth, eth/downloader, eth/fetcher: delete eth/61 code

The eth/61 protocol was disabled in #2776, this commit removes its
message handlers and hash-chain sync logic.
This commit is contained in:
Felix Lange 2016-07-21 11:36:38 +02:00
parent a4c4125b11
commit 016007bd25
15 changed files with 125 additions and 1530 deletions

View File

@ -49,11 +49,6 @@ var (
MaxStateFetch = 384 // Amount of node state values to allow fetching per request MaxStateFetch = 384 // Amount of node state values to allow fetching per request
MaxForkAncestry = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation MaxForkAncestry = 3 * params.EpochDuration.Uint64() // Maximum chain reorganisation
hashTTL = 3 * time.Second // [eth/61] Time it takes for a hash request to time out
blockTargetRTT = 3 * time.Second / 2 // [eth/61] Target time for completing a block retrieval request
blockTTL = 3 * blockTargetRTT // [eth/61] Maximum time allowance before a block request is considered expired
rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests rttMinEstimate = 2 * time.Second // Minimum round-trip time to target for download requests
rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests rttMaxEstimate = 20 * time.Second // Maximum rount-trip time to target for download requests
rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value rttMinConfidence = 0.1 // Worse confidence factor in our estimated RTT value
@ -64,7 +59,6 @@ var (
qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence qosConfidenceCap = 10 // Number of peers above which not to modify RTT confidence
qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value qosTuningImpact = 0.25 // Impact that a new tuning target has on the previous value
maxQueuedHashes = 32 * 1024 // [eth/61] Maximum number of hashes to queue for import (DOS protection)
maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection) maxQueuedHeaders = 32 * 1024 // [eth/62] Maximum number of headers to queue for import (DOS protection)
maxHeadersProcess = 2048 // Number of header download results to import at once into the chain maxHeadersProcess = 2048 // Number of header download results to import at once into the chain
maxResultsProcess = 2048 // Number of content download results to import at once into the chain maxResultsProcess = 2048 // Number of content download results to import at once into the chain
@ -84,16 +78,13 @@ var (
errStallingPeer = errors.New("peer is stalling") errStallingPeer = errors.New("peer is stalling")
errNoPeers = errors.New("no peers to keep download active") errNoPeers = errors.New("no peers to keep download active")
errTimeout = errors.New("timeout") errTimeout = errors.New("timeout")
errEmptyHashSet = errors.New("empty hash set by peer")
errEmptyHeaderSet = errors.New("empty header set by peer") errEmptyHeaderSet = errors.New("empty header set by peer")
errPeersUnavailable = errors.New("no peers available or all tried for download") errPeersUnavailable = errors.New("no peers available or all tried for download")
errAlreadyInPool = errors.New("hash already in pool")
errInvalidAncestor = errors.New("retrieved ancestor is invalid") errInvalidAncestor = errors.New("retrieved ancestor is invalid")
errInvalidChain = errors.New("retrieved hash chain is invalid") errInvalidChain = errors.New("retrieved hash chain is invalid")
errInvalidBlock = errors.New("retrieved block is invalid") errInvalidBlock = errors.New("retrieved block is invalid")
errInvalidBody = errors.New("retrieved block body is invalid") errInvalidBody = errors.New("retrieved block body is invalid")
errInvalidReceipt = errors.New("retrieved receipt is invalid") errInvalidReceipt = errors.New("retrieved receipt is invalid")
errCancelHashFetch = errors.New("hash download canceled (requested)")
errCancelBlockFetch = errors.New("block download canceled (requested)") errCancelBlockFetch = errors.New("block download canceled (requested)")
errCancelHeaderFetch = errors.New("block header download canceled (requested)") errCancelHeaderFetch = errors.New("block header download canceled (requested)")
errCancelBodyFetch = errors.New("block body download canceled (requested)") errCancelBodyFetch = errors.New("block body download canceled (requested)")
@ -102,6 +93,7 @@ var (
errCancelHeaderProcessing = errors.New("header processing canceled (requested)") errCancelHeaderProcessing = errors.New("header processing canceled (requested)")
errCancelContentProcessing = errors.New("content processing canceled (requested)") errCancelContentProcessing = errors.New("content processing canceled (requested)")
errNoSyncActive = errors.New("no sync active") errNoSyncActive = errors.New("no sync active")
errTooOld = errors.New("peer doesn't speak recent enough protocol version (need version >= 62)")
) )
type Downloader struct { type Downloader struct {
@ -146,13 +138,10 @@ type Downloader struct {
// Channels // Channels
newPeerCh chan *peer newPeerCh chan *peer
hashCh chan dataPack // [eth/61] Channel receiving inbound hashes
blockCh chan dataPack // [eth/61] Channel receiving inbound blocks
headerCh chan dataPack // [eth/62] Channel receiving inbound block headers headerCh chan dataPack // [eth/62] Channel receiving inbound block headers
bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies bodyCh chan dataPack // [eth/62] Channel receiving inbound block bodies
receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts receiptCh chan dataPack // [eth/63] Channel receiving inbound receipts
stateCh chan dataPack // [eth/63] Channel receiving inbound node state data stateCh chan dataPack // [eth/63] Channel receiving inbound node state data
blockWakeCh chan bool // [eth/61] Channel to signal the block fetcher of new tasks
bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks bodyWakeCh chan bool // [eth/62] Channel to signal the block body fetcher of new tasks
receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks receiptWakeCh chan bool // [eth/63] Channel to signal the receipt fetcher of new tasks
stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks stateWakeCh chan bool // [eth/63] Channel to signal the state fetcher of new tasks
@ -199,13 +188,10 @@ func New(stateDb ethdb.Database, mux *event.TypeMux, hasHeader headerCheckFn, ha
rollback: rollback, rollback: rollback,
dropPeer: dropPeer, dropPeer: dropPeer,
newPeerCh: make(chan *peer, 1), newPeerCh: make(chan *peer, 1),
hashCh: make(chan dataPack, 1),
blockCh: make(chan dataPack, 1),
headerCh: make(chan dataPack, 1), headerCh: make(chan dataPack, 1),
bodyCh: make(chan dataPack, 1), bodyCh: make(chan dataPack, 1),
receiptCh: make(chan dataPack, 1), receiptCh: make(chan dataPack, 1),
stateCh: make(chan dataPack, 1), stateCh: make(chan dataPack, 1),
blockWakeCh: make(chan bool, 1),
bodyWakeCh: make(chan bool, 1), bodyWakeCh: make(chan bool, 1),
receiptWakeCh: make(chan bool, 1), receiptWakeCh: make(chan bool, 1),
stateWakeCh: make(chan bool, 1), stateWakeCh: make(chan bool, 1),
@ -251,12 +237,11 @@ func (d *Downloader) Synchronising() bool {
// RegisterPeer injects a new download peer into the set of block source to be // RegisterPeer injects a new download peer into the set of block source to be
// used for fetching hashes and blocks from. // used for fetching hashes and blocks from.
func (d *Downloader) RegisterPeer(id string, version int, head common.Hash, func (d *Downloader) RegisterPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) error {
glog.V(logger.Detail).Infoln("Registering peer", id) glog.V(logger.Detail).Infoln("Registering peer", id)
if err := d.peers.Register(newPeer(id, version, head, getRelHashes, getAbsHashes, getBlocks, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil { if err := d.peers.Register(newPeer(id, version, head, getRelHeaders, getAbsHeaders, getBlockBodies, getReceipts, getNodeData)); err != nil {
glog.V(logger.Error).Infoln("Register failed:", err) glog.V(logger.Error).Infoln("Register failed:", err)
return err return err
} }
@ -291,7 +276,9 @@ func (d *Downloader) Synchronise(id string, head common.Hash, td *big.Int, mode
case errBusy: case errBusy:
glog.V(logger.Detail).Infof("Synchronisation already in progress") glog.V(logger.Detail).Infof("Synchronisation already in progress")
case errTimeout, errBadPeer, errStallingPeer, errEmptyHashSet, errEmptyHeaderSet, errPeersUnavailable, errInvalidAncestor, errInvalidChain: case errTimeout, errBadPeer, errStallingPeer,
errEmptyHeaderSet, errPeersUnavailable, errTooOld,
errInvalidAncestor, errInvalidChain:
glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err) glog.V(logger.Debug).Infof("Removing peer %v: %v", id, err)
d.dropPeer(id) d.dropPeer(id)
@ -323,13 +310,13 @@ func (d *Downloader) synchronise(id string, hash common.Hash, td *big.Int, mode
d.queue.Reset() d.queue.Reset()
d.peers.Reset() d.peers.Reset()
for _, ch := range []chan bool{d.blockWakeCh, d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} { for _, ch := range []chan bool{d.bodyWakeCh, d.receiptWakeCh, d.stateWakeCh} {
select { select {
case <-ch: case <-ch:
default: default:
} }
} }
for _, ch := range []chan dataPack{d.hashCh, d.blockCh, d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} { for _, ch := range []chan dataPack{d.headerCh, d.bodyCh, d.receiptCh, d.stateCh} {
for empty := false; !empty; { for empty := false; !empty; {
select { select {
case <-ch: case <-ch:
@ -377,41 +364,15 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
d.mux.Post(DoneEvent{}) d.mux.Post(DoneEvent{})
} }
}() }()
if p.version < 62 {
return errTooOld
}
glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version) glog.V(logger.Debug).Infof("Synchronising with the network using: %s [eth/%d]", p.id, p.version)
defer func(start time.Time) { defer func(start time.Time) {
glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start)) glog.V(logger.Debug).Infof("Synchronisation terminated after %v", time.Since(start))
}(time.Now()) }(time.Now())
switch {
case p.version == 61:
// Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight61(p)
if err != nil {
return err
}
origin, err := d.findAncestor61(p, latest)
if err != nil {
return err
}
d.syncStatsLock.Lock()
if d.syncStatsChainHeight <= origin || d.syncStatsChainOrigin > origin {
d.syncStatsChainOrigin = origin
}
d.syncStatsChainHeight = latest
d.syncStatsLock.Unlock()
// Initiate the sync using a concurrent hash and block retrieval algorithm
d.queue.Prepare(origin+1, d.mode, 0, nil)
if d.syncInitHook != nil {
d.syncInitHook(origin, latest)
}
return d.spawnSync(origin+1,
func() error { return d.fetchHashes61(p, td, origin+1) },
func() error { return d.fetchBlocks61(origin + 1) },
)
case p.version >= 62:
// Look up the sync boundaries: the common ancestor and the target block // Look up the sync boundaries: the common ancestor and the target block
latest, err := d.fetchHeight(p) latest, err := d.fetchHeight(p)
if err != nil { if err != nil {
@ -470,12 +431,6 @@ func (d *Downloader) syncWithPeer(p *peer, hash common.Hash, td *big.Int) (err e
func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync func() error { return d.fetchReceipts(origin + 1) }, // Receipts are retrieved during fast sync
func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync func() error { return d.fetchNodeData() }, // Node state data is retrieved during fast sync
) )
default:
// Something very wrong, stop right here
glog.V(logger.Error).Infof("Unsupported eth protocol: %d", p.version)
return errBadPeer
}
} }
// spawnSync runs d.process and all given fetcher functions to completion in // spawnSync runs d.process and all given fetcher functions to completion in
@ -540,452 +495,6 @@ func (d *Downloader) Terminate() {
d.cancel() d.cancel()
} }
// fetchHeight61 retrieves the head block of the remote peer to aid in estimating
// the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight61(p *peer) (uint64, error) {
glog.V(logger.Debug).Infof("%v: retrieving remote chain height", p)
// Request the advertised remote head block and wait for the response
go p.getBlocks([]common.Hash{p.head})
timeout := time.After(hashTTL)
for {
select {
case <-d.cancelCh:
return 0, errCancelBlockFetch
case packet := <-d.blockCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received blocks from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
blocks := packet.(*blockPack).blocks
if len(blocks) != 1 {
glog.V(logger.Debug).Infof("%v: invalid number of head blocks: %d != 1", p, len(blocks))
return 0, errBadPeer
}
return blocks[0].NumberU64(), nil
case <-timeout:
glog.V(logger.Debug).Infof("%v: head block timeout", p)
return 0, errTimeout
case <-d.hashCh:
// Out of bounds hashes received, ignore them
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// findAncestor61 tries to locate the common ancestor block of the local chain and
// a remote peers blockchain. In the general case when our node was in sync and
// on the correct chain, checking the top N blocks should already get us a match.
// In the rare scenario when we ended up on a long reorganisation (i.e. none of
// the head blocks match), we do a binary search to find the common ancestor.
func (d *Downloader) findAncestor61(p *peer, height uint64) (uint64, error) {
glog.V(logger.Debug).Infof("%v: looking for common ancestor", p)
// Figure out the valid ancestor range to prevent rewrite attacks
floor, ceil := int64(-1), d.headBlock().NumberU64()
if ceil >= MaxForkAncestry {
floor = int64(ceil - MaxForkAncestry)
}
// Request the topmost blocks to short circuit binary ancestor lookup
head := ceil
if head > height {
head = height
}
from := int64(head) - int64(MaxHashFetch) + 1
if from < 0 {
from = 0
}
go p.getAbsHashes(uint64(from), MaxHashFetch)
// Wait for the remote response to the head fetch
number, hash := uint64(0), common.Hash{}
timeout := time.After(hashTTL)
for finished := false; !finished; {
select {
case <-d.cancelCh:
return 0, errCancelHashFetch
case packet := <-d.hashCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
hashes := packet.(*hashPack).hashes
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("%v: empty head hash set", p)
return 0, errEmptyHashSet
}
// Check if a common ancestor was found
finished = true
for i := len(hashes) - 1; i >= 0; i-- {
// Skip any headers that underflow/overflow our requested set
header := d.getHeader(hashes[i])
if header == nil || header.Number.Int64() < from || header.Number.Uint64() > head {
continue
}
// Otherwise check if we already know the header or not
if d.hasBlockAndState(hashes[i]) {
number, hash = header.Number.Uint64(), header.Hash()
break
}
}
case <-timeout:
glog.V(logger.Debug).Infof("%v: head hash timeout", p)
return 0, errTimeout
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
// If the head fetch already found an ancestor, return
if !common.EmptyHash(hash) {
if int64(number) <= floor {
glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, number, hash[:4], floor)
return 0, errInvalidAncestor
}
glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, number, hash[:4])
return number, nil
}
// Ancestor not found, we need to binary search over our chain
start, end := uint64(0), head
if floor > 0 {
start = uint64(floor)
}
for start+1 < end {
// Split our chain interval in two, and request the hash to cross check
check := (start + end) / 2
timeout := time.After(hashTTL)
go p.getAbsHashes(uint64(check), 1)
// Wait until a reply arrives to this request
for arrived := false; !arrived; {
select {
case <-d.cancelCh:
return 0, errCancelHashFetch
case packet := <-d.hashCh:
// Discard anything not from the origin peer
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
// Make sure the peer actually gave something valid
hashes := packet.(*hashPack).hashes
if len(hashes) != 1 {
glog.V(logger.Debug).Infof("%v: invalid search hash set (%d)", p, len(hashes))
return 0, errBadPeer
}
arrived = true
// Modify the search interval based on the response
if !d.hasBlockAndState(hashes[0]) {
end = check
break
}
block := d.getBlock(hashes[0]) // this doesn't check state, hence the above explicit check
if block.NumberU64() != check {
glog.V(logger.Debug).Infof("%v: non requested hash #%d [%x…], instead of #%d", p, block.NumberU64(), block.Hash().Bytes()[:4], check)
return 0, errBadPeer
}
start = check
case <-timeout:
glog.V(logger.Debug).Infof("%v: search hash timeout", p)
return 0, errTimeout
case <-d.blockCh:
// Out of bounds blocks received, ignore them
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// Ensure valid ancestry and return
if int64(start) <= floor {
glog.V(logger.Warn).Infof("%v: potential rewrite attack: #%d [%x…] <= #%d limit", p, start, hash[:4], floor)
return 0, errInvalidAncestor
}
glog.V(logger.Debug).Infof("%v: common ancestor: #%d [%x…]", p, start, hash[:4])
return start, nil
}
// fetchHashes61 keeps retrieving hashes from the requested number, until no more
// are returned, potentially throttling on the way.
func (d *Downloader) fetchHashes61(p *peer, td *big.Int, from uint64) error {
glog.V(logger.Debug).Infof("%v: downloading hashes from #%d", p, from)
// Create a timeout timer, and the associated hash fetcher
request := time.Now() // time of the last fetch request
timeout := time.NewTimer(0) // timer to dump a non-responsive active peer
<-timeout.C // timeout channel should be initially empty
defer timeout.Stop()
getHashes := func(from uint64) {
glog.V(logger.Detail).Infof("%v: fetching %d hashes from #%d", p, MaxHashFetch, from)
request = time.Now()
timeout.Reset(hashTTL)
go p.getAbsHashes(from, MaxHashFetch)
}
// Start pulling hashes, until all are exhausted
getHashes(from)
gotHashes := false
for {
select {
case <-d.cancelCh:
return errCancelHashFetch
case packet := <-d.hashCh:
// Make sure the active peer is giving us the hashes
if packet.PeerId() != p.id {
glog.V(logger.Debug).Infof("Received hashes from incorrect peer(%s)", packet.PeerId())
break
}
hashReqTimer.UpdateSince(request)
timeout.Stop()
// If no more hashes are inbound, notify the block fetcher and return
if packet.Items() == 0 {
glog.V(logger.Debug).Infof("%v: no available hashes", p)
select {
case d.blockWakeCh <- false:
case <-d.cancelCh:
}
// If no hashes were retrieved at all, the peer violated it's TD promise that it had a
// better chain compared to ours. The only exception is if it's promised blocks were
// already imported by other means (e.g. fetcher):
//
// R <remote peer>, L <local node>: Both at block 10
// R: Mine block 11, and propagate it to L
// L: Queue block 11 for import
// L: Notice that R's head and TD increased compared to ours, start sync
// L: Import of block 11 finishes
// L: Sync begins, and finds common ancestor at 11
// L: Request new hashes up from 11 (R's TD was higher, it must have something)
// R: Nothing to give
if !gotHashes && td.Cmp(d.getTd(d.headBlock().Hash())) > 0 {
return errStallingPeer
}
return nil
}
gotHashes = true
hashes := packet.(*hashPack).hashes
// Otherwise insert all the new hashes, aborting in case of junk
glog.V(logger.Detail).Infof("%v: scheduling %d hashes from #%d", p, len(hashes), from)
inserts := d.queue.Schedule61(hashes, true)
if len(inserts) != len(hashes) {
glog.V(logger.Debug).Infof("%v: stale hashes", p)
return errBadPeer
}
// Notify the block fetcher of new hashes, but stop if queue is full
if d.queue.PendingBlocks() < maxQueuedHashes {
// We still have hashes to fetch, send continuation wake signal (potential)
select {
case d.blockWakeCh <- true:
default:
}
} else {
// Hash limit reached, send a termination wake signal (enforced)
select {
case d.blockWakeCh <- false:
case <-d.cancelCh:
}
return nil
}
// Queue not yet full, fetch the next batch
from += uint64(len(hashes))
getHashes(from)
case <-timeout.C:
glog.V(logger.Debug).Infof("%v: hash request timed out", p)
hashTimeoutMeter.Mark(1)
return errTimeout
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// fetchBlocks61 iteratively downloads the scheduled hashes, taking any available
// peers, reserving a chunk of blocks for each, waiting for delivery and also
// periodically checking for timeouts.
func (d *Downloader) fetchBlocks61(from uint64) error {
glog.V(logger.Debug).Infof("Downloading blocks from #%d", from)
defer glog.V(logger.Debug).Infof("Block download terminated")
// Create a timeout timer for scheduling expiration tasks
ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()
update := make(chan struct{}, 1)
// Fetch blocks until the hash fetcher's done
finished := false
for {
select {
case <-d.cancelCh:
return errCancelBlockFetch
case packet := <-d.blockCh:
// If the peer was previously banned and failed to deliver it's pack
// in a reasonable time frame, ignore it's message.
if peer := d.peers.Peer(packet.PeerId()); peer != nil {
blocks := packet.(*blockPack).blocks
// Deliver the received chunk of blocks and check chain validity
accepted, err := d.queue.DeliverBlocks(peer.id, blocks)
if err == errInvalidChain {
return err
}
// Unless a peer delivered something completely else than requested (usually
// caused by a timed out request which came through in the end), set it to
// idle. If the delivery's stale, the peer should have already been idled.
if err != errStaleDelivery {
peer.SetBlocksIdle(accepted)
}
// Issue a log to the user to see what's going on
switch {
case err == nil && len(blocks) == 0:
glog.V(logger.Detail).Infof("%s: no blocks delivered", peer)
case err == nil:
glog.V(logger.Detail).Infof("%s: delivered %d blocks", peer, len(blocks))
default:
glog.V(logger.Detail).Infof("%s: delivery failed: %v", peer, err)
}
}
// Blocks arrived, try to update the progress
select {
case update <- struct{}{}:
default:
}
case cont := <-d.blockWakeCh:
// The hash fetcher sent a continuation flag, check if it's done
if !cont {
finished = true
}
// Hashes arrive, try to update the progress
select {
case update <- struct{}{}:
default:
}
case <-ticker.C:
// Sanity check update the progress
select {
case update <- struct{}{}:
default:
}
case <-update:
// Short circuit if we lost all our peers
if d.peers.Len() == 0 {
return errNoPeers
}
// Check for block request timeouts and demote the responsible peers
for pid, fails := range d.queue.ExpireBlocks(blockTTL) {
if peer := d.peers.Peer(pid); peer != nil {
if fails > 1 {
glog.V(logger.Detail).Infof("%s: block delivery timeout", peer)
peer.SetBlocksIdle(0)
} else {
glog.V(logger.Debug).Infof("%s: stalling block delivery, dropping", peer)
d.dropPeer(pid)
}
}
}
// If there's nothing more to fetch, wait or terminate
if d.queue.PendingBlocks() == 0 {
if !d.queue.InFlightBlocks() && finished {
glog.V(logger.Debug).Infof("Block fetching completed")
return nil
}
break
}
// Send a download request to all idle peers, until throttled
throttled := false
idles, total := d.peers.BlockIdlePeers()
for _, peer := range idles {
// Short circuit if throttling activated
if d.queue.ShouldThrottleBlocks() {
throttled = true
break
}
// Reserve a chunk of hashes for a peer. A nil can mean either that
// no more hashes are available, or that the peer is known not to
// have them.
request := d.queue.ReserveBlocks(peer, peer.BlockCapacity(blockTargetRTT))
if request == nil {
continue
}
if glog.V(logger.Detail) {
glog.Infof("%s: requesting %d blocks", peer, len(request.Hashes))
}
// Fetch the chunk and make sure any errors return the hashes to the queue
if err := peer.Fetch61(request); err != nil {
// Although we could try and make an attempt to fix this, this error really
// means that we've double allocated a fetch task to a peer. If that is the
// case, the internal state of the downloader and the queue is very wrong so
// better hard crash and note the error instead of silently accumulating into
// a much bigger issue.
panic(fmt.Sprintf("%v: fetch assignment failed", peer))
}
}
// Make sure that we have peers available for fetching. If all peers have been tried
// and all failed throw an error
if !throttled && !d.queue.InFlightBlocks() && len(idles) == total {
return errPeersUnavailable
}
case <-d.headerCh:
case <-d.bodyCh:
case <-d.stateCh:
case <-d.receiptCh:
// Ignore eth/{62,63} packets because this is eth/61.
// These can arrive as a late delivery from a previous sync.
}
}
}
// fetchHeight retrieves the head header of the remote peer to aid in estimating // fetchHeight retrieves the head header of the remote peer to aid in estimating
// the total time a pending synchronisation would take. // the total time a pending synchronisation would take.
func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) { func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
@ -1022,11 +531,6 @@ func (d *Downloader) fetchHeight(p *peer) (*types.Header, error) {
case <-d.stateCh: case <-d.stateCh:
case <-d.receiptCh: case <-d.receiptCh:
// Out of bounds delivery, ignore // Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1067,7 +571,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
for finished := false; !finished; { for finished := false; !finished; {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelHashFetch return 0, errCancelHeaderFetch
case packet := <-d.headerCh: case packet := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -1114,11 +618,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case <-d.stateCh: case <-d.stateCh:
case <-d.receiptCh: case <-d.receiptCh:
// Out of bounds delivery, ignore // Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
// If the head fetch already found an ancestor, return // If the head fetch already found an ancestor, return
@ -1146,7 +645,7 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
for arrived := false; !arrived; { for arrived := false; !arrived; {
select { select {
case <-d.cancelCh: case <-d.cancelCh:
return 0, errCancelHashFetch return 0, errCancelHeaderFetch
case packer := <-d.headerCh: case packer := <-d.headerCh:
// Discard anything not from the origin peer // Discard anything not from the origin peer
@ -1182,11 +681,6 @@ func (d *Downloader) findAncestor(p *peer, height uint64) (uint64, error) {
case <-d.stateCh: case <-d.stateCh:
case <-d.receiptCh: case <-d.receiptCh:
// Out of bounds delivery, ignore // Out of bounds delivery, ignore
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1305,11 +799,6 @@ func (d *Downloader) fetchHeaders(p *peer, from uint64) error {
case <-d.cancelCh: case <-d.cancelCh:
} }
return errBadPeer return errBadPeer
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1623,11 +1112,6 @@ func (d *Downloader) fetchParts(errCancel error, deliveryCh chan dataPack, deliv
if !progressed && !throttled && !running && len(idles) == total && pending() > 0 { if !progressed && !throttled && !running && len(idles) == total && pending() > 0 {
return errPeersUnavailable return errPeersUnavailable
} }
case <-d.hashCh:
case <-d.blockCh:
// Ignore eth/61 packets because this is eth/62+.
// These can arrive as a late delivery from a previous sync.
} }
} }
} }
@ -1867,19 +1351,6 @@ func (d *Downloader) processContent() error {
} }
} }
// DeliverHashes injects a new batch of hashes received from a remote node into
// the download schedule. This is usually invoked through the BlockHashesMsg by
// the protocol handler.
func (d *Downloader) DeliverHashes(id string, hashes []common.Hash) (err error) {
return d.deliver(id, d.hashCh, &hashPack{id, hashes}, hashInMeter, hashDropMeter)
}
// DeliverBlocks injects a new batch of blocks received from a remote node.
// This is usually invoked through the BlocksMsg by the protocol handler.
func (d *Downloader) DeliverBlocks(id string, blocks []*types.Block) (err error) {
return d.deliver(id, d.blockCh, &blockPack{id, blocks}, blockInMeter, blockDropMeter)
}
// DeliverHeaders injects a new batch of block headers received from a remote // DeliverHeaders injects a new batch of block headers received from a remote
// node into the download schedule. // node into the download schedule.
func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) { func (d *Downloader) DeliverHeaders(id string, headers []*types.Header) (err error) {

View File

@ -399,14 +399,12 @@ func (dl *downloadTester) newSlowPeer(id string, version int, hashes []common.Ha
var err error var err error
switch version { switch version {
case 61:
err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHashesFn(id, delay), dl.peerGetAbsHashesFn(id, delay), dl.peerGetBlocksFn(id, delay), nil, nil, nil, nil, nil)
case 62: case 62:
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil) err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), nil, nil)
case 63: case 63:
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
case 64: case 64:
err = dl.downloader.RegisterPeer(id, version, hashes[0], nil, nil, nil, dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay)) err = dl.downloader.RegisterPeer(id, version, hashes[0], dl.peerGetRelHeadersFn(id, delay), dl.peerGetAbsHeadersFn(id, delay), dl.peerGetBodiesFn(id, delay), dl.peerGetReceiptsFn(id, delay), dl.peerGetNodeDataFn(id, delay))
} }
if err == nil { if err == nil {
// Assign the owned hashes, headers and blocks to the peer (deep copy) // Assign the owned hashes, headers and blocks to the peer (deep copy)
@ -465,86 +463,6 @@ func (dl *downloadTester) dropPeer(id string) {
dl.downloader.UnregisterPeer(id) dl.downloader.UnregisterPeer(id)
} }
// peerGetRelHashesFn constructs a GetHashes function associated with a specific
// peer in the download tester. The returned function can be used to retrieve
// batches of hashes from the particularly requested peer.
func (dl *downloadTester) peerGetRelHashesFn(id string, delay time.Duration) func(head common.Hash) error {
return func(head common.Hash) error {
time.Sleep(delay)
dl.lock.RLock()
defer dl.lock.RUnlock()
// Gather the next batch of hashes
hashes := dl.peerHashes[id]
result := make([]common.Hash, 0, MaxHashFetch)
for i, hash := range hashes {
if hash == head {
i++
for len(result) < cap(result) && i < len(hashes) {
result = append(result, hashes[i])
i++
}
break
}
}
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
dl.downloader.DeliverHashes(id, result)
}()
return nil
}
}
// peerGetAbsHashesFn constructs a GetHashesFromNumber function associated with
// a particular peer in the download tester. The returned function can be used to
// retrieve batches of hashes from the particularly requested peer.
func (dl *downloadTester) peerGetAbsHashesFn(id string, delay time.Duration) func(uint64, int) error {
return func(head uint64, count int) error {
time.Sleep(delay)
dl.lock.RLock()
defer dl.lock.RUnlock()
// Gather the next batch of hashes
hashes := dl.peerHashes[id]
result := make([]common.Hash, 0, count)
for i := 0; i < count && len(hashes)-int(head)-1-i >= 0; i++ {
result = append(result, hashes[len(hashes)-int(head)-1-i])
}
// Delay delivery a bit to allow attacks to unfold
go func() {
time.Sleep(time.Millisecond)
dl.downloader.DeliverHashes(id, result)
}()
return nil
}
}
// peerGetBlocksFn constructs a getBlocks function associated with a particular
// peer in the download tester. The returned function can be used to retrieve
// batches of blocks from the particularly requested peer.
func (dl *downloadTester) peerGetBlocksFn(id string, delay time.Duration) func([]common.Hash) error {
return func(hashes []common.Hash) error {
time.Sleep(delay)
dl.lock.RLock()
defer dl.lock.RUnlock()
blocks := dl.peerBlocks[id]
result := make([]*types.Block, 0, len(hashes))
for _, hash := range hashes {
if block, ok := blocks[hash]; ok {
result = append(result, block)
}
}
go dl.downloader.DeliverBlocks(id, result)
return nil
}
}
// peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed // peerGetRelHeadersFn constructs a GetBlockHeaders function based on a hashed
// origin; associated with a particular peer in the download tester. The returned // origin; associated with a particular peer in the download tester. The returned
// function can be used to retrieve batches of headers from the particular peer. // function can be used to retrieve batches of headers from the particular peer.
@ -730,7 +648,6 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng
// Tests that simple synchronization against a canonical chain works correctly. // Tests that simple synchronization against a canonical chain works correctly.
// In this test common ancestor lookup should be short circuited and not require // In this test common ancestor lookup should be short circuited and not require
// binary searching. // binary searching.
func TestCanonicalSynchronisation61(t *testing.T) { testCanonicalSynchronisation(t, 61, FullSync) }
func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) } func TestCanonicalSynchronisation62(t *testing.T) { testCanonicalSynchronisation(t, 62, FullSync) }
func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) } func TestCanonicalSynchronisation63Full(t *testing.T) { testCanonicalSynchronisation(t, 63, FullSync) }
func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) } func TestCanonicalSynchronisation63Fast(t *testing.T) { testCanonicalSynchronisation(t, 63, FastSync) }
@ -759,7 +676,6 @@ func testCanonicalSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that if a large batch of blocks are being downloaded, it is throttled // Tests that if a large batch of blocks are being downloaded, it is throttled
// until the cached blocks are retrieved. // until the cached blocks are retrieved.
func TestThrottling61(t *testing.T) { testThrottling(t, 61, FullSync) }
func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) } func TestThrottling62(t *testing.T) { testThrottling(t, 62, FullSync) }
func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) } func TestThrottling63Full(t *testing.T) { testThrottling(t, 63, FullSync) }
func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) } func TestThrottling63Fast(t *testing.T) { testThrottling(t, 63, FastSync) }
@ -845,7 +761,6 @@ func testThrottling(t *testing.T, protocol int, mode SyncMode) {
// Tests that simple synchronization against a forked chain works correctly. In // Tests that simple synchronization against a forked chain works correctly. In
// this test common ancestor lookup should *not* be short circuited, and a full // this test common ancestor lookup should *not* be short circuited, and a full
// binary search should be executed. // binary search should be executed.
func TestForkedSync61(t *testing.T) { testForkedSync(t, 61, FullSync) }
func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) } func TestForkedSync62(t *testing.T) { testForkedSync(t, 62, FullSync) }
func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) } func TestForkedSync63Full(t *testing.T) { testForkedSync(t, 63, FullSync) }
func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) } func TestForkedSync63Fast(t *testing.T) { testForkedSync(t, 63, FastSync) }
@ -881,7 +796,6 @@ func testForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that synchronising against a much shorter but much heavyer fork works // Tests that synchronising against a much shorter but much heavyer fork works
// corrently and is not dropped. // corrently and is not dropped.
func TestHeavyForkedSync61(t *testing.T) { testHeavyForkedSync(t, 61, FullSync) }
func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) } func TestHeavyForkedSync62(t *testing.T) { testHeavyForkedSync(t, 62, FullSync) }
func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) } func TestHeavyForkedSync63Full(t *testing.T) { testHeavyForkedSync(t, 63, FullSync) }
func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) } func TestHeavyForkedSync63Fast(t *testing.T) { testHeavyForkedSync(t, 63, FastSync) }
@ -915,24 +829,9 @@ func testHeavyForkedSync(t *testing.T, protocol int, mode SyncMode) {
assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork/2 + 1}) assertOwnForkedChain(t, tester, common+1, []int{common + fork + 1, common + fork/2 + 1})
} }
// Tests that an inactive downloader will not accept incoming hashes and blocks.
func TestInactiveDownloader61(t *testing.T) {
t.Parallel()
tester := newTester()
// Check that neither hashes nor blocks are accepted
if err := tester.downloader.DeliverHashes("bad peer", []common.Hash{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
if err := tester.downloader.DeliverBlocks("bad peer", []*types.Block{}); err != errNoSyncActive {
t.Errorf("error mismatch: have %v, want %v", err, errNoSyncActive)
}
}
// Tests that chain forks are contained within a certain interval of the current // Tests that chain forks are contained within a certain interval of the current
// chain head, ensuring that malicious peers cannot waste resources by feeding // chain head, ensuring that malicious peers cannot waste resources by feeding
// long dead chains. // long dead chains.
func TestBoundedForkedSync61(t *testing.T) { testBoundedForkedSync(t, 61, FullSync) }
func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) } func TestBoundedForkedSync62(t *testing.T) { testBoundedForkedSync(t, 62, FullSync) }
func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) } func TestBoundedForkedSync63Full(t *testing.T) { testBoundedForkedSync(t, 63, FullSync) }
func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) } func TestBoundedForkedSync63Fast(t *testing.T) { testBoundedForkedSync(t, 63, FastSync) }
@ -968,7 +867,6 @@ func testBoundedForkedSync(t *testing.T, protocol int, mode SyncMode) {
// Tests that chain forks are contained within a certain interval of the current // Tests that chain forks are contained within a certain interval of the current
// chain head for short but heavy forks too. These are a bit special because they // chain head for short but heavy forks too. These are a bit special because they
// take different ancestor lookup paths. // take different ancestor lookup paths.
func TestBoundedHeavyForkedSync61(t *testing.T) { testBoundedHeavyForkedSync(t, 61, FullSync) }
func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) } func TestBoundedHeavyForkedSync62(t *testing.T) { testBoundedHeavyForkedSync(t, 62, FullSync) }
func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) } func TestBoundedHeavyForkedSync63Full(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FullSync) }
func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) } func TestBoundedHeavyForkedSync63Fast(t *testing.T) { testBoundedHeavyForkedSync(t, 63, FastSync) }
@ -1039,7 +937,6 @@ func TestInactiveDownloader63(t *testing.T) {
} }
// Tests that a canceled download wipes all previously accumulated state. // Tests that a canceled download wipes all previously accumulated state.
func TestCancel61(t *testing.T) { testCancel(t, 61, FullSync) }
func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) } func TestCancel62(t *testing.T) { testCancel(t, 62, FullSync) }
func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) } func TestCancel63Full(t *testing.T) { testCancel(t, 63, FullSync) }
func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) } func TestCancel63Fast(t *testing.T) { testCancel(t, 63, FastSync) }
@ -1081,7 +978,6 @@ func testCancel(t *testing.T, protocol int, mode SyncMode) {
} }
// Tests that synchronisation from multiple peers works as intended (multi thread sanity test). // Tests that synchronisation from multiple peers works as intended (multi thread sanity test).
func TestMultiSynchronisation61(t *testing.T) { testMultiSynchronisation(t, 61, FullSync) }
func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) } func TestMultiSynchronisation62(t *testing.T) { testMultiSynchronisation(t, 62, FullSync) }
func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) } func TestMultiSynchronisation63Full(t *testing.T) { testMultiSynchronisation(t, 63, FullSync) }
func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) } func TestMultiSynchronisation63Fast(t *testing.T) { testMultiSynchronisation(t, 63, FastSync) }
@ -1112,7 +1008,6 @@ func testMultiSynchronisation(t *testing.T, protocol int, mode SyncMode) {
// Tests that synchronisations behave well in multi-version protocol environments // Tests that synchronisations behave well in multi-version protocol environments
// and not wreak havoc on other nodes in the network. // and not wreak havoc on other nodes in the network.
func TestMultiProtoSynchronisation61(t *testing.T) { testMultiProtoSync(t, 61, FullSync) }
func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) } func TestMultiProtoSynchronisation62(t *testing.T) { testMultiProtoSync(t, 62, FullSync) }
func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) } func TestMultiProtoSynchronisation63Full(t *testing.T) { testMultiProtoSync(t, 63, FullSync) }
func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) } func TestMultiProtoSynchronisation63Fast(t *testing.T) { testMultiProtoSync(t, 63, FastSync) }
@ -1131,7 +1026,6 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
tester := newTester() tester := newTester()
defer tester.terminate() defer tester.terminate()
tester.newPeer("peer 61", 61, hashes, nil, blocks, nil)
tester.newPeer("peer 62", 62, hashes, headers, blocks, nil) tester.newPeer("peer 62", 62, hashes, headers, blocks, nil)
tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts) tester.newPeer("peer 63", 63, hashes, headers, blocks, receipts)
tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts) tester.newPeer("peer 64", 64, hashes, headers, blocks, receipts)
@ -1143,7 +1037,7 @@ func testMultiProtoSync(t *testing.T, protocol int, mode SyncMode) {
assertOwnChain(t, tester, targetBlocks+1) assertOwnChain(t, tester, targetBlocks+1)
// Check that no peers have been dropped off // Check that no peers have been dropped off
for _, version := range []int{61, 62, 63, 64} { for _, version := range []int{62, 63, 64} {
peer := fmt.Sprintf("peer %d", version) peer := fmt.Sprintf("peer %d", version)
if _, ok := tester.peerHashes[peer]; !ok { if _, ok := tester.peerHashes[peer]; !ok {
t.Errorf("%s dropped", peer) t.Errorf("%s dropped", peer)
@ -1368,7 +1262,6 @@ func testInvalidHeaderRollback(t *testing.T, protocol int, mode SyncMode) {
// Tests that a peer advertising an high TD doesn't get to stall the downloader // Tests that a peer advertising an high TD doesn't get to stall the downloader
// afterwards by not sending any useful hashes. // afterwards by not sending any useful hashes.
func TestHighTDStarvationAttack61(t *testing.T) { testHighTDStarvationAttack(t, 61, FullSync) }
func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) } func TestHighTDStarvationAttack62(t *testing.T) { testHighTDStarvationAttack(t, 62, FullSync) }
func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) } func TestHighTDStarvationAttack63Full(t *testing.T) { testHighTDStarvationAttack(t, 63, FullSync) }
func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) } func TestHighTDStarvationAttack63Fast(t *testing.T) { testHighTDStarvationAttack(t, 63, FastSync) }
@ -1391,7 +1284,6 @@ func testHighTDStarvationAttack(t *testing.T, protocol int, mode SyncMode) {
} }
// Tests that misbehaving peers are disconnected, whilst behaving ones are not. // Tests that misbehaving peers are disconnected, whilst behaving ones are not.
func TestBlockHeaderAttackerDropping61(t *testing.T) { testBlockHeaderAttackerDropping(t, 61) }
func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) } func TestBlockHeaderAttackerDropping62(t *testing.T) { testBlockHeaderAttackerDropping(t, 62) }
func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) } func TestBlockHeaderAttackerDropping63(t *testing.T) { testBlockHeaderAttackerDropping(t, 63) }
func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) } func TestBlockHeaderAttackerDropping64(t *testing.T) { testBlockHeaderAttackerDropping(t, 64) }
@ -1409,7 +1301,6 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errStallingPeer, true}, // Peer was detected to be stalling, drop it {errStallingPeer, true}, // Peer was detected to be stalling, drop it
{errNoPeers, false}, // No peers to download from, soft race, no issue {errNoPeers, false}, // No peers to download from, soft race, no issue
{errTimeout, true}, // No hashes received in due time, drop the peer {errTimeout, true}, // No hashes received in due time, drop the peer
{errEmptyHashSet, true}, // No hashes were returned as a response, drop as it's a dead end
{errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end {errEmptyHeaderSet, true}, // No headers were returned as a response, drop as it's a dead end
{errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser {errPeersUnavailable, true}, // Nobody had the advertised blocks, drop the advertiser
{errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter {errInvalidAncestor, true}, // Agreed upon ancestor is not acceptable, drop the chain rewriter
@ -1417,7 +1308,6 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
{errInvalidBlock, false}, // A bad peer was detected, but not the sync origin {errInvalidBlock, false}, // A bad peer was detected, but not the sync origin
{errInvalidBody, false}, // A bad peer was detected, but not the sync origin {errInvalidBody, false}, // A bad peer was detected, but not the sync origin
{errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin {errInvalidReceipt, false}, // A bad peer was detected, but not the sync origin
{errCancelHashFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBlockFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelHeaderFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
{errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop {errCancelBodyFetch, false}, // Synchronisation was canceled, origin may be innocent, don't drop
@ -1450,7 +1340,6 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol int) {
// Tests that synchronisation progress (origin block number, current block number // Tests that synchronisation progress (origin block number, current block number
// and highest block number) is tracked and updated correctly. // and highest block number) is tracked and updated correctly.
func TestSyncProgress61(t *testing.T) { testSyncProgress(t, 61, FullSync) }
func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) } func TestSyncProgress62(t *testing.T) { testSyncProgress(t, 62, FullSync) }
func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) } func TestSyncProgress63Full(t *testing.T) { testSyncProgress(t, 63, FullSync) }
func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) } func TestSyncProgress63Fast(t *testing.T) { testSyncProgress(t, 63, FastSync) }
@ -1524,7 +1413,6 @@ func testSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// Tests that synchronisation progress (origin block number and highest block // Tests that synchronisation progress (origin block number and highest block
// number) is tracked and updated correctly in case of a fork (or manual head // number) is tracked and updated correctly in case of a fork (or manual head
// revertal). // revertal).
func TestForkedSyncProgress61(t *testing.T) { testForkedSyncProgress(t, 61, FullSync) }
func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) } func TestForkedSyncProgress62(t *testing.T) { testForkedSyncProgress(t, 62, FullSync) }
func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) } func TestForkedSyncProgress63Full(t *testing.T) { testForkedSyncProgress(t, 63, FullSync) }
func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) } func TestForkedSyncProgress63Fast(t *testing.T) { testForkedSyncProgress(t, 63, FastSync) }
@ -1601,7 +1489,6 @@ func testForkedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// Tests that if synchronisation is aborted due to some failure, then the progress // Tests that if synchronisation is aborted due to some failure, then the progress
// origin is not updated in the next sync cycle, as it should be considered the // origin is not updated in the next sync cycle, as it should be considered the
// continuation of the previous sync and not a new instance. // continuation of the previous sync and not a new instance.
func TestFailedSyncProgress61(t *testing.T) { testFailedSyncProgress(t, 61, FullSync) }
func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) } func TestFailedSyncProgress62(t *testing.T) { testFailedSyncProgress(t, 62, FullSync) }
func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) } func TestFailedSyncProgress63Full(t *testing.T) { testFailedSyncProgress(t, 63, FullSync) }
func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) } func TestFailedSyncProgress63Fast(t *testing.T) { testFailedSyncProgress(t, 63, FastSync) }
@ -1679,7 +1566,6 @@ func testFailedSyncProgress(t *testing.T, protocol int, mode SyncMode) {
// Tests that if an attacker fakes a chain height, after the attack is detected, // Tests that if an attacker fakes a chain height, after the attack is detected,
// the progress height is successfully reduced at the next sync invocation. // the progress height is successfully reduced at the next sync invocation.
func TestFakedSyncProgress61(t *testing.T) { testFakedSyncProgress(t, 61, FullSync) }
func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) } func TestFakedSyncProgress62(t *testing.T) { testFakedSyncProgress(t, 62, FullSync) }
func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) } func TestFakedSyncProgress63Full(t *testing.T) { testFakedSyncProgress(t, 63, FullSync) }
func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) } func TestFakedSyncProgress63Fast(t *testing.T) { testFakedSyncProgress(t, 63, FastSync) }

View File

@ -23,16 +23,6 @@ import (
) )
var ( var (
hashInMeter = metrics.NewMeter("eth/downloader/hashes/in")
hashReqTimer = metrics.NewTimer("eth/downloader/hashes/req")
hashDropMeter = metrics.NewMeter("eth/downloader/hashes/drop")
hashTimeoutMeter = metrics.NewMeter("eth/downloader/hashes/timeout")
blockInMeter = metrics.NewMeter("eth/downloader/blocks/in")
blockReqTimer = metrics.NewTimer("eth/downloader/blocks/req")
blockDropMeter = metrics.NewMeter("eth/downloader/blocks/drop")
blockTimeoutMeter = metrics.NewMeter("eth/downloader/blocks/timeout")
headerInMeter = metrics.NewMeter("eth/downloader/headers/in") headerInMeter = metrics.NewMeter("eth/downloader/headers/in")
headerReqTimer = metrics.NewTimer("eth/downloader/headers/req") headerReqTimer = metrics.NewTimer("eth/downloader/headers/req")
headerDropMeter = metrics.NewMeter("eth/downloader/headers/drop") headerDropMeter = metrics.NewMeter("eth/downloader/headers/drop")

View File

@ -37,11 +37,6 @@ const (
measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value. measurementImpact = 0.1 // The impact a single measurement has on a peer's final throughput value.
) )
// Hash and block fetchers belonging to eth/61 and below
type relativeHashFetcherFn func(common.Hash) error
type absoluteHashFetcherFn func(uint64, int) error
type blockFetcherFn func([]common.Hash) error
// Block header and body fetchers belonging to eth/62 and above // Block header and body fetchers belonging to eth/62 and above
type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error type relativeHeaderFetcherFn func(common.Hash, int, int, bool) error
type absoluteHeaderFetcherFn func(uint64, int, int, bool) error type absoluteHeaderFetcherFn func(uint64, int, int, bool) error
@ -79,10 +74,6 @@ type peer struct {
lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously) lacking map[common.Hash]struct{} // Set of hashes not to request (didn't have previously)
getRelHashes relativeHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an origin hash
getAbsHashes absoluteHashFetcherFn // [eth/61] Method to retrieve a batch of hashes from an absolute position
getBlocks blockFetcherFn // [eth/61] Method to retrieve a batch of blocks
getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash getRelHeaders relativeHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an origin hash
getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position getAbsHeaders absoluteHeaderFetcherFn // [eth/62] Method to retrieve a batch of headers from an absolute position
getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies getBlockBodies blockBodyFetcherFn // [eth/62] Method to retrieve a batch of block bodies
@ -97,7 +88,6 @@ type peer struct {
// newPeer create a new downloader peer, with specific hash and block retrieval // newPeer create a new downloader peer, with specific hash and block retrieval
// mechanisms. // mechanisms.
func newPeer(id string, version int, head common.Hash, func newPeer(id string, version int, head common.Hash,
getRelHashes relativeHashFetcherFn, getAbsHashes absoluteHashFetcherFn, getBlocks blockFetcherFn, // eth/61 callbacks, remove when upgrading
getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn, getRelHeaders relativeHeaderFetcherFn, getAbsHeaders absoluteHeaderFetcherFn, getBlockBodies blockBodyFetcherFn,
getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer { getReceipts receiptFetcherFn, getNodeData stateFetcherFn) *peer {
return &peer{ return &peer{
@ -105,10 +95,6 @@ func newPeer(id string, version int, head common.Hash,
head: head, head: head,
lacking: make(map[common.Hash]struct{}), lacking: make(map[common.Hash]struct{}),
getRelHashes: getRelHashes,
getAbsHashes: getAbsHashes,
getBlocks: getBlocks,
getRelHeaders: getRelHeaders, getRelHeaders: getRelHeaders,
getAbsHeaders: getAbsHeaders, getAbsHeaders: getAbsHeaders,
getBlockBodies: getBlockBodies, getBlockBodies: getBlockBodies,
@ -138,28 +124,6 @@ func (p *peer) Reset() {
p.lacking = make(map[common.Hash]struct{}) p.lacking = make(map[common.Hash]struct{})
} }
// Fetch61 sends a block retrieval request to the remote peer.
func (p *peer) Fetch61(request *fetchRequest) error {
// Sanity check the protocol version
if p.version != 61 {
panic(fmt.Sprintf("block fetch [eth/61] requested on eth/%d", p.version))
}
// Short circuit if the peer is already fetching
if !atomic.CompareAndSwapInt32(&p.blockIdle, 0, 1) {
return errAlreadyFetching
}
p.blockStarted = time.Now()
// Convert the hash set to a retrievable slice
hashes := make([]common.Hash, 0, len(request.Hashes))
for hash, _ := range request.Hashes {
hashes = append(hashes, hash)
}
go p.getBlocks(hashes)
return nil
}
// FetchHeaders sends a header retrieval request to the remote peer. // FetchHeaders sends a header retrieval request to the remote peer.
func (p *peer) FetchHeaders(from uint64, count int) error { func (p *peer) FetchHeaders(from uint64, count int) error {
// Sanity check the protocol version // Sanity check the protocol version
@ -481,20 +445,6 @@ func (ps *peerSet) AllPeers() []*peer {
return list return list
} }
// BlockIdlePeers retrieves a flat list of all the currently idle peers within the
// active peer set, ordered by their reputation.
func (ps *peerSet) BlockIdlePeers() ([]*peer, int) {
idle := func(p *peer) bool {
return atomic.LoadInt32(&p.blockIdle) == 0
}
throughput := func(p *peer) float64 {
p.lock.RLock()
defer p.lock.RUnlock()
return p.blockThroughput
}
return ps.idlePeers(61, 61, idle, throughput)
}
// HeaderIdlePeers retrieves a flat list of all the currently header-idle peers // HeaderIdlePeers retrieves a flat list of all the currently header-idle peers
// within the active peer set, ordered by their reputation. // within the active peer set, ordered by their reputation.
func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) { func (ps *peerSet) HeaderIdlePeers() ([]*peer, int) {

View File

@ -45,7 +45,6 @@ var (
var ( var (
errNoFetchesPending = errors.New("no fetches pending") errNoFetchesPending = errors.New("no fetches pending")
errStateSyncPending = errors.New("state trie sync already scheduled")
errStaleDelivery = errors.New("stale delivery") errStaleDelivery = errors.New("stale delivery")
) )
@ -74,10 +73,6 @@ type queue struct {
mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching mode SyncMode // Synchronisation mode to decide on the block parts to schedule for fetching
fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode fastSyncPivot uint64 // Block number where the fast sync pivots into archive synchronisation mode
hashPool map[common.Hash]int // [eth/61] Pending hashes, mapping to their insertion index (priority)
hashQueue *prque.Prque // [eth/61] Priority queue of the block hashes to fetch
hashCounter int // [eth/61] Counter indexing the added hashes to ensure retrieval order
headerHead common.Hash // [eth/62] Hash of the last queued header to verify order headerHead common.Hash // [eth/62] Hash of the last queued header to verify order
// Headers are "special", they download in batches, supported by a skeleton chain // Headers are "special", they download in batches, supported by a skeleton chain
@ -85,7 +80,6 @@ type queue struct {
headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for headerTaskQueue *prque.Prque // [eth/62] Priority queue of the skeleton indexes to fetch the filling headers for
headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable headerPeerMiss map[string]map[uint64]struct{} // [eth/62] Set of per-peer header batches known to be unavailable
headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations headerPendPool map[string]*fetchRequest // [eth/62] Currently pending header retrieval operations
headerDonePool map[uint64]struct{} // [eth/62] Set of the completed header fetches
headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers headerResults []*types.Header // [eth/62] Result cache accumulating the completed headers
headerProced int // [eth/62] Number of headers already processed from the results headerProced int // [eth/62] Number of headers already processed from the results
headerOffset uint64 // [eth/62] Number of the first header in the result cache headerOffset uint64 // [eth/62] Number of the first header in the result cache
@ -124,8 +118,6 @@ type queue struct {
func newQueue(stateDb ethdb.Database) *queue { func newQueue(stateDb ethdb.Database) *queue {
lock := new(sync.Mutex) lock := new(sync.Mutex)
return &queue{ return &queue{
hashPool: make(map[common.Hash]int),
hashQueue: prque.New(),
headerPendPool: make(map[string]*fetchRequest), headerPendPool: make(map[string]*fetchRequest),
headerContCh: make(chan bool), headerContCh: make(chan bool),
blockTaskPool: make(map[common.Hash]*types.Header), blockTaskPool: make(map[common.Hash]*types.Header),
@ -158,10 +150,6 @@ func (q *queue) Reset() {
q.mode = FullSync q.mode = FullSync
q.fastSyncPivot = 0 q.fastSyncPivot = 0
q.hashPool = make(map[common.Hash]int)
q.hashQueue.Reset()
q.hashCounter = 0
q.headerHead = common.Hash{} q.headerHead = common.Hash{}
q.headerPendPool = make(map[string]*fetchRequest) q.headerPendPool = make(map[string]*fetchRequest)
@ -208,7 +196,7 @@ func (q *queue) PendingBlocks() int {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
return q.hashQueue.Size() + q.blockTaskQueue.Size() return q.blockTaskQueue.Size()
} }
// PendingReceipts retrieves the number of block receipts pending for retrieval. // PendingReceipts retrieves the number of block receipts pending for retrieval.
@ -272,7 +260,7 @@ func (q *queue) Idle() bool {
q.lock.Lock() q.lock.Lock()
defer q.lock.Unlock() defer q.lock.Unlock()
queued := q.hashQueue.Size() + q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size() queued := q.blockTaskQueue.Size() + q.receiptTaskQueue.Size() + q.stateTaskQueue.Size()
pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool) pending := len(q.blockPendPool) + len(q.receiptPendPool) + len(q.statePendPool)
cached := len(q.blockDonePool) + len(q.receiptDonePool) cached := len(q.blockDonePool) + len(q.receiptDonePool)
@ -323,34 +311,6 @@ func (q *queue) ShouldThrottleReceipts() bool {
return pending >= len(q.resultCache)-len(q.receiptDonePool) return pending >= len(q.resultCache)-len(q.receiptDonePool)
} }
// Schedule61 adds a set of hashes for the download queue for scheduling, returning
// the new hashes encountered.
func (q *queue) Schedule61(hashes []common.Hash, fifo bool) []common.Hash {
q.lock.Lock()
defer q.lock.Unlock()
// Insert all the hashes prioritised in the arrival order
inserts := make([]common.Hash, 0, len(hashes))
for _, hash := range hashes {
// Skip anything we already have
if old, ok := q.hashPool[hash]; ok {
glog.V(logger.Warn).Infof("Hash %x already scheduled at index %v", hash, old)
continue
}
// Update the counters and insert the hash
q.hashCounter = q.hashCounter + 1
inserts = append(inserts, hash)
q.hashPool[hash] = q.hashCounter
if fifo {
q.hashQueue.Push(hash, -float32(q.hashCounter)) // Lowest gets schedules first
} else {
q.hashQueue.Push(hash, float32(q.hashCounter)) // Highest gets schedules first
}
}
return inserts
}
// ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill // ScheduleSkeleton adds a batch of header retrieval tasks to the queue to fill
// up an already retrieved header skeleton. // up an already retrieved header skeleton.
func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) { func (q *queue) ScheduleSkeleton(from uint64, skeleton []*types.Header) {
@ -550,15 +510,6 @@ func (q *queue) ReserveHeaders(p *peer, count int) *fetchRequest {
return request return request
} }
// ReserveBlocks reserves a set of block hashes for the given peer, skipping any
// previously failed download.
func (q *queue) ReserveBlocks(p *peer, count int) *fetchRequest {
q.lock.Lock()
defer q.lock.Unlock()
return q.reserveHashes(p, count, q.hashQueue, nil, q.blockPendPool, len(q.resultCache)-len(q.blockDonePool))
}
// ReserveNodeData reserves a set of node data hashes for the given peer, skipping // ReserveNodeData reserves a set of node data hashes for the given peer, skipping
// any previously failed download. // any previously failed download.
func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest { func (q *queue) ReserveNodeData(p *peer, count int) *fetchRequest {
@ -753,11 +704,6 @@ func (q *queue) CancelHeaders(request *fetchRequest) {
q.cancel(request, q.headerTaskQueue, q.headerPendPool) q.cancel(request, q.headerTaskQueue, q.headerPendPool)
} }
// CancelBlocks aborts a fetch request, returning all pending hashes to the queue.
func (q *queue) CancelBlocks(request *fetchRequest) {
q.cancel(request, q.hashQueue, q.blockPendPool)
}
// CancelBodies aborts a body fetch request, returning all pending headers to the // CancelBodies aborts a body fetch request, returning all pending headers to the
// task queue. // task queue.
func (q *queue) CancelBodies(request *fetchRequest) { func (q *queue) CancelBodies(request *fetchRequest) {
@ -801,9 +747,6 @@ func (q *queue) Revoke(peerId string) {
defer q.lock.Unlock() defer q.lock.Unlock()
if request, ok := q.blockPendPool[peerId]; ok { if request, ok := q.blockPendPool[peerId]; ok {
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
for _, header := range request.Headers { for _, header := range request.Headers {
q.blockTaskQueue.Push(header, -float32(header.Number.Uint64())) q.blockTaskQueue.Push(header, -float32(header.Number.Uint64()))
} }
@ -832,15 +775,6 @@ func (q *queue) ExpireHeaders(timeout time.Duration) map[string]int {
return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter) return q.expire(timeout, q.headerPendPool, q.headerTaskQueue, headerTimeoutMeter)
} }
// ExpireBlocks checks for in flight requests that exceeded a timeout allowance,
// canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBlocks(timeout time.Duration) map[string]int {
q.lock.Lock()
defer q.lock.Unlock()
return q.expire(timeout, q.blockPendPool, q.hashQueue, blockTimeoutMeter)
}
// ExpireBodies checks for in flight block body requests that exceeded a timeout // ExpireBodies checks for in flight block body requests that exceeded a timeout
// allowance, canceling them and returning the responsible peers for penalisation. // allowance, canceling them and returning the responsible peers for penalisation.
func (q *queue) ExpireBodies(timeout time.Duration) map[string]int { func (q *queue) ExpireBodies(timeout time.Duration) map[string]int {
@ -907,74 +841,6 @@ func (q *queue) expire(timeout time.Duration, pendPool map[string]*fetchRequest,
return expiries return expiries
} }
// DeliverBlocks injects a block retrieval response into the download queue. The
// method returns the number of blocks accepted from the delivery and also wakes
// any threads waiting for data delivery.
func (q *queue) DeliverBlocks(id string, blocks []*types.Block) (int, error) {
q.lock.Lock()
defer q.lock.Unlock()
// Short circuit if the blocks were never requested
request := q.blockPendPool[id]
if request == nil {
return 0, errNoFetchesPending
}
blockReqTimer.UpdateSince(request.Time)
delete(q.blockPendPool, id)
// If no blocks were retrieved, mark them as unavailable for the origin peer
if len(blocks) == 0 {
for hash, _ := range request.Hashes {
request.Peer.MarkLacking(hash)
}
}
// Iterate over the downloaded blocks and add each of them
accepted, errs := 0, make([]error, 0)
for _, block := range blocks {
// Skip any blocks that were not requested
hash := block.Hash()
if _, ok := request.Hashes[hash]; !ok {
errs = append(errs, fmt.Errorf("non-requested block %x", hash))
continue
}
// Reconstruct the next result if contents match up
index := int(block.Number().Int64() - int64(q.resultOffset))
if index >= len(q.resultCache) || index < 0 {
errs = []error{errInvalidChain}
break
}
q.resultCache[index] = &fetchResult{
Header: block.Header(),
Transactions: block.Transactions(),
Uncles: block.Uncles(),
}
q.blockDonePool[block.Hash()] = struct{}{}
delete(request.Hashes, hash)
delete(q.hashPool, hash)
accepted++
}
// Return all failed or missing fetches to the queue
for hash, index := range request.Hashes {
q.hashQueue.Push(hash, float32(index))
}
// Wake up WaitResults
if accepted > 0 {
q.active.Signal()
}
// If none of the blocks were good, it's a stale delivery
switch {
case len(errs) == 0:
return accepted, nil
case len(errs) == 1 && (errs[0] == errInvalidChain || errs[0] == errInvalidBlock):
return accepted, errs[0]
case len(errs) == len(blocks):
return accepted, errStaleDelivery
default:
return accepted, fmt.Errorf("multiple failures: %v", errs)
}
}
// DeliverHeaders injects a header retrieval response into the header results // DeliverHeaders injects a header retrieval response into the header results
// cache. This method either accepts all headers it received, or none of them // cache. This method either accepts all headers it received, or none of them
// if they do not map correctly to the skeleton. // if they do not map correctly to the skeleton.

View File

@ -73,26 +73,6 @@ type dataPack interface {
Stats() string Stats() string
} }
// hashPack is a batch of block hashes returned by a peer (eth/61).
type hashPack struct {
peerId string
hashes []common.Hash
}
func (p *hashPack) PeerId() string { return p.peerId }
func (p *hashPack) Items() int { return len(p.hashes) }
func (p *hashPack) Stats() string { return fmt.Sprintf("%d", len(p.hashes)) }
// blockPack is a batch of blocks returned by a peer (eth/61).
type blockPack struct {
peerId string
blocks []*types.Block
}
func (p *blockPack) PeerId() string { return p.peerId }
func (p *blockPack) Items() int { return len(p.blocks) }
func (p *blockPack) Stats() string { return fmt.Sprintf("%d", len(p.blocks)) }
// headerPack is a batch of block headers returned by a peer. // headerPack is a batch of block headers returned by a peer.
type headerPack struct { type headerPack struct {
peerId string peerId string

View File

@ -48,9 +48,6 @@ var (
// blockRetrievalFn is a callback type for retrieving a block from the local chain. // blockRetrievalFn is a callback type for retrieving a block from the local chain.
type blockRetrievalFn func(common.Hash) *types.Block type blockRetrievalFn func(common.Hash) *types.Block
// blockRequesterFn is a callback type for sending a block retrieval request.
type blockRequesterFn func([]common.Hash) error
// headerRequesterFn is a callback type for sending a header retrieval request. // headerRequesterFn is a callback type for sending a header retrieval request.
type headerRequesterFn func(common.Hash) error type headerRequesterFn func(common.Hash) error
@ -82,7 +79,6 @@ type announce struct {
origin string // Identifier of the peer originating the notification origin string // Identifier of the peer originating the notification
fetch61 blockRequesterFn // [eth/61] Fetcher function to retrieve an announced block
fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block fetchHeader headerRequesterFn // [eth/62] Fetcher function to retrieve the header of an announced block
fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block fetchBodies bodyRequesterFn // [eth/62] Fetcher function to retrieve the body of an announced block
} }
@ -191,14 +187,12 @@ func (f *Fetcher) Stop() {
// Notify announces the fetcher of the potential availability of a new block in // Notify announces the fetcher of the potential availability of a new block in
// the network. // the network.
func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
blockFetcher blockRequesterFn, // eth/61 specific whole block fetcher
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
block := &announce{ block := &announce{
hash: hash, hash: hash,
number: number, number: number,
time: time, time: time,
origin: peer, origin: peer,
fetch61: blockFetcher,
fetchHeader: headerFetcher, fetchHeader: headerFetcher,
fetchBodies: bodyFetcher, fetchBodies: bodyFetcher,
} }
@ -224,34 +218,6 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
} }
} }
// FilterBlocks extracts all the blocks that were explicitly requested by the fetcher,
// returning those that should be handled differently.
func (f *Fetcher) FilterBlocks(blocks types.Blocks) types.Blocks {
glog.V(logger.Detail).Infof("[eth/61] filtering %d blocks", len(blocks))
// Send the filter channel to the fetcher
filter := make(chan []*types.Block)
select {
case f.blockFilter <- filter:
case <-f.quit:
return nil
}
// Request the filtering of the block list
select {
case filter <- blocks:
case <-f.quit:
return nil
}
// Retrieve the blocks remaining after filtering
select {
case blocks := <-filter:
return blocks
case <-f.quit:
return nil
}
}
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently. // returning those that should be handled differently.
func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header { func (f *Fetcher) FilterHeaders(headers []*types.Header, time time.Time) []*types.Header {
@ -413,7 +379,7 @@ func (f *Fetcher) loop() {
} }
} }
} }
// Send out all block (eth/61) or header (eth/62) requests // Send out all block header requests
for peer, hashes := range request { for peer, hashes := range request {
if glog.V(logger.Detail) && len(hashes) > 0 { if glog.V(logger.Detail) && len(hashes) > 0 {
list := "[" list := "["
@ -421,30 +387,18 @@ func (f *Fetcher) loop() {
list += fmt.Sprintf("%x…, ", hash[:4]) list += fmt.Sprintf("%x…, ", hash[:4])
} }
list = list[:len(list)-2] + "]" list = list[:len(list)-2] + "]"
if f.fetching[hashes[0]].fetch61 != nil {
glog.V(logger.Detail).Infof("[eth/61] Peer %s: fetching blocks %s", peer, list)
} else {
glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching headers %s", peer, list) glog.V(logger.Detail).Infof("[eth/62] Peer %s: fetching headers %s", peer, list)
} }
}
// Create a closure of the fetch and schedule in on a new thread // Create a closure of the fetch and schedule in on a new thread
fetchBlocks, fetchHeader, hashes := f.fetching[hashes[0]].fetch61, f.fetching[hashes[0]].fetchHeader, hashes fetchHeader, hashes := f.fetching[hashes[0]].fetchHeader, hashes
go func() { go func() {
if f.fetchingHook != nil { if f.fetchingHook != nil {
f.fetchingHook(hashes) f.fetchingHook(hashes)
} }
if fetchBlocks != nil {
// Use old eth/61 protocol to retrieve whole blocks
blockFetchMeter.Mark(int64(len(hashes)))
fetchBlocks(hashes)
} else {
// Use new eth/62 protocol to retrieve headers first
for _, hash := range hashes { for _, hash := range hashes {
headerFetchMeter.Mark(1) headerFetchMeter.Mark(1)
fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals fetchHeader(hash) // Suboptimal, but protocol doesn't allow batch header retrievals
} }
}
}() }()
} }
// Schedule the next fetch if blocks are still pending // Schedule the next fetch if blocks are still pending
@ -486,46 +440,6 @@ func (f *Fetcher) loop() {
// Schedule the next fetch if blocks are still pending // Schedule the next fetch if blocks are still pending
f.rescheduleComplete(completeTimer) f.rescheduleComplete(completeTimer)
case filter := <-f.blockFilter:
// Blocks arrived, extract any explicit fetches, return all else
var blocks types.Blocks
select {
case blocks = <-filter:
case <-f.quit:
return
}
blockFilterInMeter.Mark(int64(len(blocks)))
explicit, download := []*types.Block{}, []*types.Block{}
for _, block := range blocks {
hash := block.Hash()
// Filter explicitly requested blocks from hash announcements
if f.fetching[hash] != nil && f.queued[hash] == nil {
// Discard if already imported by other means
if f.getBlock(hash) == nil {
explicit = append(explicit, block)
} else {
f.forgetHash(hash)
}
} else {
download = append(download, block)
}
}
blockFilterOutMeter.Mark(int64(len(download)))
select {
case filter <- download:
case <-f.quit:
return
}
// Schedule the retrieved blocks for ordered import
for _, block := range explicit {
if announce := f.fetching[block.Hash()]; announce != nil {
f.enqueue(announce.origin, block)
}
}
case filter := <-f.headerFilter: case filter := <-f.headerFilter:
// Headers arrived from a remote peer. Extract those that were explicitly // Headers arrived from a remote peer. Extract those that were explicitly
// requested by the fetcher, and return everything else so it's delivered // requested by the fetcher, and return everything else so it's delivered

View File

@ -151,28 +151,6 @@ func (f *fetcherTester) dropPeer(peer string) {
f.drops[peer] = true f.drops[peer] = true
} }
// makeBlockFetcher retrieves a block fetcher associated with a simulated peer.
func (f *fetcherTester) makeBlockFetcher(blocks map[common.Hash]*types.Block) blockRequesterFn {
closure := make(map[common.Hash]*types.Block)
for hash, block := range blocks {
closure[hash] = block
}
// Create a function that returns blocks from the closure
return func(hashes []common.Hash) error {
// Gather the blocks to return
blocks := make([]*types.Block, 0, len(hashes))
for _, hash := range hashes {
if block, ok := closure[hash]; ok {
blocks = append(blocks, block)
}
}
// Return on a new thread
go f.fetcher.FilterBlocks(blocks)
return nil
}
}
// makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer. // makeHeaderFetcher retrieves a block header fetcher associated with a simulated peer.
func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn { func (f *fetcherTester) makeHeaderFetcher(blocks map[common.Hash]*types.Block, drift time.Duration) headerRequesterFn {
closure := make(map[common.Hash]*types.Block) closure := make(map[common.Hash]*types.Block)
@ -293,7 +271,6 @@ func verifyImportDone(t *testing.T, imported chan *types.Block) {
// Tests that a fetcher accepts block announcements and initiates retrievals for // Tests that a fetcher accepts block announcements and initiates retrievals for
// them, successfully importing into the local chain. // them, successfully importing into the local chain.
func TestSequentialAnnouncements61(t *testing.T) { testSequentialAnnouncements(t, 61) }
func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) } func TestSequentialAnnouncements62(t *testing.T) { testSequentialAnnouncements(t, 62) }
func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) } func TestSequentialAnnouncements63(t *testing.T) { testSequentialAnnouncements(t, 63) }
func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) } func TestSequentialAnnouncements64(t *testing.T) { testSequentialAnnouncements(t, 64) }
@ -304,7 +281,6 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@ -313,11 +289,7 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block } tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- { for i := len(hashes) - 2; i >= 0; i-- {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
verifyImportEvent(t, imported, true) verifyImportEvent(t, imported, true)
} }
verifyImportDone(t, imported) verifyImportDone(t, imported)
@ -325,7 +297,6 @@ func testSequentialAnnouncements(t *testing.T, protocol int) {
// Tests that if blocks are announced by multiple peers (or even the same buggy // Tests that if blocks are announced by multiple peers (or even the same buggy
// peer), they will only get downloaded at most once. // peer), they will only get downloaded at most once.
func TestConcurrentAnnouncements61(t *testing.T) { testConcurrentAnnouncements(t, 61) }
func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) } func TestConcurrentAnnouncements62(t *testing.T) { testConcurrentAnnouncements(t, 62) }
func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) } func TestConcurrentAnnouncements63(t *testing.T) { testConcurrentAnnouncements(t, 63) }
func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) } func TestConcurrentAnnouncements64(t *testing.T) { testConcurrentAnnouncements(t, 64) }
@ -337,15 +308,10 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
// Assemble a tester with a built in counter for the requests // Assemble a tester with a built in counter for the requests
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
counter := uint32(0) counter := uint32(0)
blockWrapper := func(hashes []common.Hash) error {
atomic.AddUint32(&counter, uint32(len(hashes)))
return blockFetcher(hashes)
}
headerWrapper := func(hash common.Hash) error { headerWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1) atomic.AddUint32(&counter, 1)
return headerFetcher(hash) return headerFetcher(hash)
@ -355,15 +321,9 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block } tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- { for i := len(hashes) - 2; i >= 0; i-- {
if protocol < 62 { tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
tester.fetcher.Notify("first", hashes[i], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil) tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), headerWrapper, bodyFetcher)
tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout+time.Millisecond), blockWrapper, nil, nil) tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), headerWrapper, bodyFetcher)
tester.fetcher.Notify("second", hashes[i], 0, time.Now().Add(-arriveTimeout-time.Millisecond), blockWrapper, nil, nil)
} else {
tester.fetcher.Notify("first", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout+time.Millisecond), nil, headerWrapper, bodyFetcher)
tester.fetcher.Notify("second", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout-time.Millisecond), nil, headerWrapper, bodyFetcher)
}
verifyImportEvent(t, imported, true) verifyImportEvent(t, imported, true)
} }
verifyImportDone(t, imported) verifyImportDone(t, imported)
@ -376,7 +336,6 @@ func testConcurrentAnnouncements(t *testing.T, protocol int) {
// Tests that announcements arriving while a previous is being fetched still // Tests that announcements arriving while a previous is being fetched still
// results in a valid import. // results in a valid import.
func TestOverlappingAnnouncements61(t *testing.T) { testOverlappingAnnouncements(t, 61) }
func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) } func TestOverlappingAnnouncements62(t *testing.T) { testOverlappingAnnouncements(t, 62) }
func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) } func TestOverlappingAnnouncements63(t *testing.T) { testOverlappingAnnouncements(t, 63) }
func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) } func TestOverlappingAnnouncements64(t *testing.T) { testOverlappingAnnouncements(t, 64) }
@ -387,7 +346,6 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@ -400,11 +358,7 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block } tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
for i := len(hashes) - 2; i >= 0; i-- { for i := len(hashes) - 2; i >= 0; i-- {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
select { select {
case <-imported: case <-imported:
case <-time.After(time.Second): case <-time.After(time.Second):
@ -416,7 +370,6 @@ func testOverlappingAnnouncements(t *testing.T, protocol int) {
} }
// Tests that announces already being retrieved will not be duplicated. // Tests that announces already being retrieved will not be duplicated.
func TestPendingDeduplication61(t *testing.T) { testPendingDeduplication(t, 61) }
func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) } func TestPendingDeduplication62(t *testing.T) { testPendingDeduplication(t, 62) }
func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) } func TestPendingDeduplication63(t *testing.T) { testPendingDeduplication(t, 63) }
func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) } func TestPendingDeduplication64(t *testing.T) { testPendingDeduplication(t, 64) }
@ -427,22 +380,11 @@ func testPendingDeduplication(t *testing.T, protocol int) {
// Assemble a tester with a built in counter and delayed fetcher // Assemble a tester with a built in counter and delayed fetcher
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
delay := 50 * time.Millisecond delay := 50 * time.Millisecond
counter := uint32(0) counter := uint32(0)
blockWrapper := func(hashes []common.Hash) error {
atomic.AddUint32(&counter, uint32(len(hashes)))
// Simulate a long running fetch
go func() {
time.Sleep(delay)
blockFetcher(hashes)
}()
return nil
}
headerWrapper := func(hash common.Hash) error { headerWrapper := func(hash common.Hash) error {
atomic.AddUint32(&counter, 1) atomic.AddUint32(&counter, 1)
@ -455,11 +397,7 @@ func testPendingDeduplication(t *testing.T, protocol int) {
} }
// Announce the same block many times until it's fetched (wait for any pending ops) // Announce the same block many times until it's fetched (wait for any pending ops)
for tester.getBlock(hashes[0]) == nil { for tester.getBlock(hashes[0]) == nil {
if protocol < 62 { tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), headerWrapper, bodyFetcher)
tester.fetcher.Notify("repeater", hashes[0], 0, time.Now().Add(-arriveTimeout), blockWrapper, nil, nil)
} else {
tester.fetcher.Notify("repeater", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerWrapper, bodyFetcher)
}
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
time.Sleep(delay) time.Sleep(delay)
@ -475,7 +413,6 @@ func testPendingDeduplication(t *testing.T, protocol int) {
// Tests that announcements retrieved in a random order are cached and eventually // Tests that announcements retrieved in a random order are cached and eventually
// imported when all the gaps are filled in. // imported when all the gaps are filled in.
func TestRandomArrivalImport61(t *testing.T) { testRandomArrivalImport(t, 61) }
func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) } func TestRandomArrivalImport62(t *testing.T) { testRandomArrivalImport(t, 62) }
func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) } func TestRandomArrivalImport63(t *testing.T) { testRandomArrivalImport(t, 63) }
func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) } func TestRandomArrivalImport64(t *testing.T) { testRandomArrivalImport(t, 64) }
@ -487,7 +424,6 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
skip := targetBlocks / 2 skip := targetBlocks / 2
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@ -497,26 +433,17 @@ func testRandomArrivalImport(t *testing.T, protocol int) {
for i := len(hashes) - 1; i >= 0; i-- { for i := len(hashes) - 1; i >= 0; i-- {
if i != skip { if i != skip {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
} }
// Finally announce the skipped entry and check full import // Finally announce the skipped entry and check full import
if protocol < 62 { tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[skip], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[skip], uint64(len(hashes)-skip-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
verifyImportCount(t, imported, len(hashes)-1) verifyImportCount(t, imported, len(hashes)-1)
} }
// Tests that direct block enqueues (due to block propagation vs. hash announce) // Tests that direct block enqueues (due to block propagation vs. hash announce)
// are correctly schedule, filling and import queue gaps. // are correctly schedule, filling and import queue gaps.
func TestQueueGapFill61(t *testing.T) { testQueueGapFill(t, 61) }
func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) } func TestQueueGapFill62(t *testing.T) { testQueueGapFill(t, 62) }
func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) } func TestQueueGapFill63(t *testing.T) { testQueueGapFill(t, 63) }
func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) } func TestQueueGapFill64(t *testing.T) { testQueueGapFill(t, 64) }
@ -528,7 +455,6 @@ func testQueueGapFill(t *testing.T, protocol int) {
skip := targetBlocks / 2 skip := targetBlocks / 2
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@ -538,11 +464,7 @@ func testQueueGapFill(t *testing.T, protocol int) {
for i := len(hashes) - 1; i >= 0; i-- { for i := len(hashes) - 1; i >= 0; i-- {
if i != skip { if i != skip {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
time.Sleep(time.Millisecond) time.Sleep(time.Millisecond)
} }
} }
@ -553,7 +475,6 @@ func testQueueGapFill(t *testing.T, protocol int) {
// Tests that blocks arriving from various sources (multiple propagations, hash // Tests that blocks arriving from various sources (multiple propagations, hash
// announces, etc) do not get scheduled for import multiple times. // announces, etc) do not get scheduled for import multiple times.
func TestImportDeduplication61(t *testing.T) { testImportDeduplication(t, 61) }
func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) } func TestImportDeduplication62(t *testing.T) { testImportDeduplication(t, 62) }
func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) } func TestImportDeduplication63(t *testing.T) { testImportDeduplication(t, 63) }
func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) } func TestImportDeduplication64(t *testing.T) { testImportDeduplication(t, 64) }
@ -564,7 +485,6 @@ func testImportDeduplication(t *testing.T, protocol int) {
// Create the tester and wrap the importer with a counter // Create the tester and wrap the importer with a counter
tester := newTester() tester := newTester()
blockFetcher := tester.makeBlockFetcher(blocks)
headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) headerFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
bodyFetcher := tester.makeBodyFetcher(blocks, 0) bodyFetcher := tester.makeBodyFetcher(blocks, 0)
@ -580,11 +500,7 @@ func testImportDeduplication(t *testing.T, protocol int) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block } tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
// Announce the duplicating block, wait for retrieval, and also propagate directly // Announce the duplicating block, wait for retrieval, and also propagate directly
if protocol < 62 { tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
tester.fetcher.Notify("valid", hashes[0], 0, time.Now().Add(-arriveTimeout), blockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher)
}
<-fetching <-fetching
tester.fetcher.Enqueue("valid", blocks[hashes[0]]) tester.fetcher.Enqueue("valid", blocks[hashes[0]])
@ -660,14 +576,14 @@ func testDistantAnnouncementDiscarding(t *testing.T, protocol int) {
tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} } tester.fetcher.fetchingHook = func(hashes []common.Hash) { fetching <- struct{}{} }
// Ensure that a block with a lower number than the threshold is discarded // Ensure that a block with a lower number than the threshold is discarded
tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) tester.fetcher.Notify("lower", hashes[low], blocks[hashes[low]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
case <-fetching: case <-fetching:
t.Fatalf("fetcher requested stale header") t.Fatalf("fetcher requested stale header")
} }
// Ensure that a block with a higher number than the threshold is discarded // Ensure that a block with a higher number than the threshold is discarded
tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) tester.fetcher.Notify("higher", hashes[high], blocks[hashes[high]].NumberU64(), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
select { select {
case <-time.After(50 * time.Millisecond): case <-time.After(50 * time.Millisecond):
case <-fetching: case <-fetching:
@ -693,7 +609,7 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
tester.fetcher.importedHook = func(block *types.Block) { imported <- block } tester.fetcher.importedHook = func(block *types.Block) { imported <- block }
// Announce a block with a bad number, check for immediate drop // Announce a block with a bad number, check for immediate drop
tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) tester.fetcher.Notify("bad", hashes[0], 2, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, false) verifyImportEvent(t, imported, false)
tester.lock.RLock() tester.lock.RLock()
@ -704,7 +620,7 @@ func testInvalidNumberAnnouncement(t *testing.T, protocol int) {
t.Fatalf("peer with invalid numbered announcement not dropped") t.Fatalf("peer with invalid numbered announcement not dropped")
} }
// Make sure a good announcement passes without a drop // Make sure a good announcement passes without a drop
tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) tester.fetcher.Notify("good", hashes[0], 1, time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
verifyImportEvent(t, imported, true) verifyImportEvent(t, imported, true)
tester.lock.RLock() tester.lock.RLock()
@ -743,7 +659,7 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
// Iteratively announce blocks until all are imported // Iteratively announce blocks until all are imported
for i := len(hashes) - 2; i >= 0; i-- { for i := len(hashes) - 2; i >= 0; i-- {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, headerFetcher, bodyFetcher) tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), headerFetcher, bodyFetcher)
// All announces should fetch the header // All announces should fetch the header
verifyFetchingEvent(t, fetching, true) verifyFetchingEvent(t, fetching, true)
@ -760,7 +676,6 @@ func testEmptyBlockShortCircuit(t *testing.T, protocol int) {
// Tests that a peer is unable to use unbounded memory with sending infinite // Tests that a peer is unable to use unbounded memory with sending infinite
// block announcements to a node, but that even in the face of such an attack, // block announcements to a node, but that even in the face of such an attack,
// the fetcher remains operational. // the fetcher remains operational.
func TestHashMemoryExhaustionAttack61(t *testing.T) { testHashMemoryExhaustionAttack(t, 61) }
func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) } func TestHashMemoryExhaustionAttack62(t *testing.T) { testHashMemoryExhaustionAttack(t, 62) }
func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) } func TestHashMemoryExhaustionAttack63(t *testing.T) { testHashMemoryExhaustionAttack(t, 63) }
func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) } func TestHashMemoryExhaustionAttack64(t *testing.T) { testHashMemoryExhaustionAttack(t, 64) }
@ -781,29 +696,19 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// Create a valid chain and an infinite junk chain // Create a valid chain and an infinite junk chain
targetBlocks := hashLimit + 2*maxQueueDist targetBlocks := hashLimit + 2*maxQueueDist
hashes, blocks := makeChain(targetBlocks, 0, genesis) hashes, blocks := makeChain(targetBlocks, 0, genesis)
validBlockFetcher := tester.makeBlockFetcher(blocks)
validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack) validHeaderFetcher := tester.makeHeaderFetcher(blocks, -gatherSlack)
validBodyFetcher := tester.makeBodyFetcher(blocks, 0) validBodyFetcher := tester.makeBodyFetcher(blocks, 0)
attack, _ := makeChain(targetBlocks, 0, unknownBlock) attack, _ := makeChain(targetBlocks, 0, unknownBlock)
attackerBlockFetcher := tester.makeBlockFetcher(nil)
attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack) attackerHeaderFetcher := tester.makeHeaderFetcher(nil, -gatherSlack)
attackerBodyFetcher := tester.makeBodyFetcher(nil, 0) attackerBodyFetcher := tester.makeBodyFetcher(nil, 0)
// Feed the tester a huge hashset from the attacker, and a limited from the valid peer // Feed the tester a huge hashset from the attacker, and a limited from the valid peer
for i := 0; i < len(attack); i++ { for i := 0; i < len(attack); i++ {
if i < maxQueueDist { if i < maxQueueDist {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), validHeaderFetcher, validBodyFetcher)
tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], 0, time.Now(), validBlockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[len(hashes)-2-i], uint64(i+1), time.Now(), nil, validHeaderFetcher, validBodyFetcher)
}
}
if protocol < 62 {
tester.fetcher.Notify("attacker", attack[i], 0, time.Now(), attackerBlockFetcher, nil, nil)
} else {
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), nil, attackerHeaderFetcher, attackerBodyFetcher)
} }
tester.fetcher.Notify("attacker", attack[i], 1 /* don't distance drop */, time.Now(), attackerHeaderFetcher, attackerBodyFetcher)
} }
if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist { if count := atomic.LoadInt32(&announces); count != hashLimit+maxQueueDist {
t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist) t.Fatalf("queued announce count mismatch: have %d, want %d", count, hashLimit+maxQueueDist)
@ -813,11 +718,7 @@ func testHashMemoryExhaustionAttack(t *testing.T, protocol int) {
// Feed the remaining valid hashes to ensure DOS protection state remains clean // Feed the remaining valid hashes to ensure DOS protection state remains clean
for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- { for i := len(hashes) - maxQueueDist - 2; i >= 0; i-- {
if protocol < 62 { tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), validHeaderFetcher, validBodyFetcher)
tester.fetcher.Notify("valid", hashes[i], 0, time.Now().Add(-arriveTimeout), validBlockFetcher, nil, nil)
} else {
tester.fetcher.Notify("valid", hashes[i], uint64(len(hashes)-i-1), time.Now().Add(-arriveTimeout), nil, validHeaderFetcher, validBodyFetcher)
}
verifyImportEvent(t, imported, true) verifyImportEvent(t, imported, true)
} }
verifyImportDone(t, imported) verifyImportDone(t, imported)

View File

@ -33,12 +33,9 @@ var (
propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop") propBroadcastDropMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/drop")
propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos") propBroadcastDOSMeter = metrics.NewMeter("eth/fetcher/prop/broadcasts/dos")
blockFetchMeter = metrics.NewMeter("eth/fetcher/fetch/blocks")
headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers") headerFetchMeter = metrics.NewMeter("eth/fetcher/fetch/headers")
bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies") bodyFetchMeter = metrics.NewMeter("eth/fetcher/fetch/bodies")
blockFilterInMeter = metrics.NewMeter("eth/fetcher/filter/blocks/in")
blockFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/blocks/out")
headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in") headerFilterInMeter = metrics.NewMeter("eth/fetcher/filter/headers/in")
headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out") headerFilterOutMeter = metrics.NewMeter("eth/fetcher/filter/headers/out")
bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in") bodyFilterInMeter = metrics.NewMeter("eth/fetcher/filter/bodies/in")

View File

@ -57,9 +57,6 @@ func errResp(code errCode, format string, v ...interface{}) error {
return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...)) return fmt.Errorf("%v - %v", code, fmt.Sprintf(format, v...))
} }
type hashFetcherFn func(common.Hash) error
type blockFetcherFn func([]common.Hash) error
type ProtocolManager struct { type ProtocolManager struct {
networkId int networkId int
@ -275,9 +272,11 @@ func (pm *ProtocolManager) handle(p *peer) error {
defer pm.removePeer(p.id) defer pm.removePeer(p.id)
// Register the peer in the downloader. If the downloader considers it banned, we disconnect // Register the peer in the downloader. If the downloader considers it banned, we disconnect
if err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(), err := pm.downloader.RegisterPeer(p.id, p.version, p.Head(),
p.RequestHashes, p.RequestHashesFromNumber, p.RequestBlocks, p.RequestHeadersByHash, p.RequestHeadersByHash, p.RequestHeadersByNumber,
p.RequestHeadersByNumber, p.RequestBodies, p.RequestReceipts, p.RequestNodeData); err != nil { p.RequestBodies, p.RequestReceipts, p.RequestNodeData,
)
if err != nil {
return err return err
} }
// Propagate existing transactions. new transactions appearing // Propagate existing transactions. new transactions appearing
@ -324,108 +323,8 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Status messages should never arrive after the handshake // Status messages should never arrive after the handshake
return errResp(ErrExtraStatusMsg, "uncontrolled status message") return errResp(ErrExtraStatusMsg, "uncontrolled status message")
case p.version < eth62 && msg.Code == GetBlockHashesMsg:
// Retrieve the number of hashes to return and from which origin hash
var request getBlockHashesData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = uint64(downloader.MaxHashFetch)
}
// Retrieve the hashes from the block chain and return them
hashes := pm.blockchain.GetBlockHashesFromHash(request.Hash, request.Amount)
if len(hashes) == 0 {
glog.V(logger.Debug).Infof("invalid block hash %x", request.Hash.Bytes()[:4])
}
return p.SendBlockHashes(hashes)
case p.version < eth62 && msg.Code == GetBlockHashesFromNumberMsg:
// Retrieve and decode the number of hashes to return and from which origin number
var request getBlockHashesFromNumberData
if err := msg.Decode(&request); err != nil {
return errResp(ErrDecode, "%v: %v", msg, err)
}
if request.Amount > uint64(downloader.MaxHashFetch) {
request.Amount = uint64(downloader.MaxHashFetch)
}
// Calculate the last block that should be retrieved, and short circuit if unavailable
last := pm.blockchain.GetBlockByNumber(request.Number + request.Amount - 1)
if last == nil {
last = pm.blockchain.CurrentBlock()
request.Amount = last.NumberU64() - request.Number + 1
}
if last.NumberU64() < request.Number {
return p.SendBlockHashes(nil)
}
// Retrieve the hashes from the last block backwards, reverse and return
hashes := []common.Hash{last.Hash()}
hashes = append(hashes, pm.blockchain.GetBlockHashesFromHash(last.Hash(), request.Amount-1)...)
for i := 0; i < len(hashes)/2; i++ {
hashes[i], hashes[len(hashes)-1-i] = hashes[len(hashes)-1-i], hashes[i]
}
return p.SendBlockHashes(hashes)
case p.version < eth62 && msg.Code == BlockHashesMsg:
// A batch of hashes arrived to one of our previous requests
var hashes []common.Hash
if err := msg.Decode(&hashes); err != nil {
break
}
// Deliver them all to the downloader for queuing
err := pm.downloader.DeliverHashes(p.id, hashes)
if err != nil {
glog.V(logger.Debug).Infoln(err)
}
case p.version < eth62 && msg.Code == GetBlocksMsg:
// Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil {
return err
}
// Gather blocks until the fetch or network limits is reached
var (
hash common.Hash
bytes common.StorageSize
blocks []*types.Block
)
for len(blocks) < downloader.MaxBlockFetch && bytes < softResponseLimit {
//Retrieve the hash of the next block
err := msgStream.Decode(&hash)
if err == rlp.EOL {
break
} else if err != nil {
return errResp(ErrDecode, "msg %v: %v", msg, err)
}
// Retrieve the requested block, stopping if enough was found
if block := pm.blockchain.GetBlockByHash(hash); block != nil {
blocks = append(blocks, block)
bytes += block.Size()
}
}
return p.SendBlocks(blocks)
case p.version < eth62 && msg.Code == BlocksMsg:
// Decode the arrived block message
var blocks []*types.Block
if err := msg.Decode(&blocks); err != nil {
glog.V(logger.Detail).Infoln("Decode error", err)
blocks = nil
}
// Update the receive timestamp of each block
for _, block := range blocks {
block.ReceivedAt = msg.ReceivedAt
block.ReceivedFrom = p
}
// Filter out any explicitly requested blocks, deliver the rest to the downloader
if blocks := pm.fetcher.FilterBlocks(blocks); len(blocks) > 0 {
pm.downloader.DeliverBlocks(p.id, blocks)
}
// Block header query, collect the requested headers and reply // Block header query, collect the requested headers and reply
case p.version >= eth62 && msg.Code == GetBlockHeadersMsg: case msg.Code == GetBlockHeadersMsg:
// Decode the complex header query // Decode the complex header query
var query getBlockHeadersData var query getBlockHeadersData
if err := msg.Decode(&query); err != nil { if err := msg.Decode(&query); err != nil {
@ -493,7 +392,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
return p.SendBlockHeaders(headers) return p.SendBlockHeaders(headers)
case p.version >= eth62 && msg.Code == BlockHeadersMsg: case msg.Code == BlockHeadersMsg:
// A batch of headers arrived to one of our previous requests // A batch of headers arrived to one of our previous requests
var headers []*types.Header var headers []*types.Header
if err := msg.Decode(&headers); err != nil { if err := msg.Decode(&headers); err != nil {
@ -545,7 +444,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
} }
case p.version >= eth62 && msg.Code == GetBlockBodiesMsg: case msg.Code == GetBlockBodiesMsg:
// Decode the retrieval message // Decode the retrieval message
msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
if _, err := msgStream.List(); err != nil { if _, err := msgStream.List(); err != nil {
@ -572,7 +471,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
return p.SendBlockBodiesRLP(bodies) return p.SendBlockBodiesRLP(bodies)
case p.version >= eth62 && msg.Code == BlockBodiesMsg: case msg.Code == BlockBodiesMsg:
// A batch of block bodies arrived to one of our previous requests // A batch of block bodies arrived to one of our previous requests
var request blockBodiesData var request blockBodiesData
if err := msg.Decode(&request); err != nil { if err := msg.Decode(&request); err != nil {
@ -723,11 +622,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
} }
} }
for _, block := range unknown { for _, block := range unknown {
if p.version < eth62 { pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestBlocks, nil, nil)
} else {
pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), nil, p.RequestOneHeader, p.RequestBodies)
}
} }
case msg.Code == NewBlockMsg: case msg.Code == NewBlockMsg:
@ -809,12 +704,8 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// Otherwise if the block is indeed in out own chain, announce it // Otherwise if the block is indeed in out own chain, announce it
if pm.blockchain.HasBlock(hash) { if pm.blockchain.HasBlock(hash) {
for _, peer := range peers { for _, peer := range peers {
if peer.version < eth62 {
peer.SendNewBlockHashes61([]common.Hash{hash})
} else {
peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()}) peer.SendNewBlockHashes([]common.Hash{hash}, []uint64{block.NumberU64()})
} }
}
glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt)) glog.V(logger.Detail).Infof("announced block %x to %d peers in %v", hash[:4], len(peers), time.Since(block.ReceivedAt))
} }
} }

View File

@ -63,160 +63,6 @@ func TestProtocolCompatibility(t *testing.T) {
} }
} }
// Tests that hashes can be retrieved from a remote chain by hashes in reverse
// order.
func TestGetBlockHashes61(t *testing.T) { testGetBlockHashes(t, 61) }
func testGetBlockHashes(t *testing.T, protocol int) {
pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil)
peer, _ := newTestPeer("peer", protocol, pm, true)
defer peer.close()
// Create a batch of tests for various scenarios
limit := downloader.MaxHashFetch
tests := []struct {
origin common.Hash
number int
result int
}{
{common.Hash{}, 1, 0}, // Make sure non existent hashes don't return results
{pm.blockchain.Genesis().Hash(), 1, 0}, // There are no hashes to retrieve up from the genesis
{pm.blockchain.GetBlockByNumber(5).Hash(), 5, 5}, // All the hashes including the genesis requested
{pm.blockchain.GetBlockByNumber(5).Hash(), 10, 5}, // More hashes than available till the genesis requested
{pm.blockchain.GetBlockByNumber(100).Hash(), 10, 10}, // All hashes available from the middle of the chain
{pm.blockchain.CurrentBlock().Hash(), 10, 10}, // All hashes available from the head of the chain
{pm.blockchain.CurrentBlock().Hash(), limit, limit}, // Request the maximum allowed hash count
{pm.blockchain.CurrentBlock().Hash(), limit + 1, limit}, // Request more than the maximum allowed hash count
}
// Run each of the tests and verify the results against the chain
for i, tt := range tests {
// Assemble the hash response we would like to receive
resp := make([]common.Hash, tt.result)
if len(resp) > 0 {
from := pm.blockchain.GetBlockByHash(tt.origin).NumberU64() - 1
for j := 0; j < len(resp); j++ {
resp[j] = pm.blockchain.GetBlockByNumber(uint64(int(from) - j)).Hash()
}
}
// Send the hash request and verify the response
p2p.Send(peer.app, 0x03, getBlockHashesData{tt.origin, uint64(tt.number)})
if err := p2p.ExpectMsg(peer.app, 0x04, resp); err != nil {
t.Errorf("test %d: block hashes mismatch: %v", i, err)
}
}
}
// Tests that hashes can be retrieved from a remote chain by numbers in forward
// order.
func TestGetBlockHashesFromNumber61(t *testing.T) { testGetBlockHashesFromNumber(t, 61) }
func testGetBlockHashesFromNumber(t *testing.T, protocol int) {
pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil)
peer, _ := newTestPeer("peer", protocol, pm, true)
defer peer.close()
// Create a batch of tests for various scenarios
limit := downloader.MaxHashFetch
tests := []struct {
origin uint64
number int
result int
}{
{pm.blockchain.CurrentBlock().NumberU64() + 1, 1, 0}, // Out of bounds requests should return empty
{pm.blockchain.CurrentBlock().NumberU64(), 1, 1}, // Make sure the head hash can be retrieved
{pm.blockchain.CurrentBlock().NumberU64() - 4, 5, 5}, // All hashes, including the head hash requested
{pm.blockchain.CurrentBlock().NumberU64() - 4, 10, 5}, // More hashes requested than available till the head
{pm.blockchain.CurrentBlock().NumberU64() - 100, 10, 10}, // All hashes available from the middle of the chain
{0, 10, 10}, // All hashes available from the root of the chain
{0, limit, limit}, // Request the maximum allowed hash count
{0, limit + 1, limit}, // Request more than the maximum allowed hash count
{0, 1, 1}, // Make sure the genesis hash can be retrieved
}
// Run each of the tests and verify the results against the chain
for i, tt := range tests {
// Assemble the hash response we would like to receive
resp := make([]common.Hash, tt.result)
for j := 0; j < len(resp); j++ {
resp[j] = pm.blockchain.GetBlockByNumber(tt.origin + uint64(j)).Hash()
}
// Send the hash request and verify the response
p2p.Send(peer.app, 0x08, getBlockHashesFromNumberData{tt.origin, uint64(tt.number)})
if err := p2p.ExpectMsg(peer.app, 0x04, resp); err != nil {
t.Errorf("test %d: block hashes mismatch: %v", i, err)
}
}
}
// Tests that blocks can be retrieved from a remote chain based on their hashes.
func TestGetBlocks61(t *testing.T) { testGetBlocks(t, 61) }
func testGetBlocks(t *testing.T, protocol int) {
pm := newTestProtocolManagerMust(t, false, downloader.MaxHashFetch+15, nil, nil)
peer, _ := newTestPeer("peer", protocol, pm, true)
defer peer.close()
// Create a batch of tests for various scenarios
limit := downloader.MaxBlockFetch
tests := []struct {
random int // Number of blocks to fetch randomly from the chain
explicit []common.Hash // Explicitly requested blocks
available []bool // Availability of explicitly requested blocks
expected int // Total number of existing blocks to expect
}{
{1, nil, nil, 1}, // A single random block should be retrievable
{10, nil, nil, 10}, // Multiple random blocks should be retrievable
{limit, nil, nil, limit}, // The maximum possible blocks should be retrievable
{limit + 1, nil, nil, limit}, // No more than the possible block count should be returned
{0, []common.Hash{pm.blockchain.Genesis().Hash()}, []bool{true}, 1}, // The genesis block should be retrievable
{0, []common.Hash{pm.blockchain.CurrentBlock().Hash()}, []bool{true}, 1}, // The chains head block should be retrievable
{0, []common.Hash{common.Hash{}}, []bool{false}, 0}, // A non existent block should not be returned
// Existing and non-existing blocks interleaved should not cause problems
{0, []common.Hash{
common.Hash{},
pm.blockchain.GetBlockByNumber(1).Hash(),
common.Hash{},
pm.blockchain.GetBlockByNumber(10).Hash(),
common.Hash{},
pm.blockchain.GetBlockByNumber(100).Hash(),
common.Hash{},
}, []bool{false, true, false, true, false, true, false}, 3},
}
// Run each of the tests and verify the results against the chain
for i, tt := range tests {
// Collect the hashes to request, and the response to expect
hashes, seen := []common.Hash{}, make(map[int64]bool)
blocks := []*types.Block{}
for j := 0; j < tt.random; j++ {
for {
num := rand.Int63n(int64(pm.blockchain.CurrentBlock().NumberU64()))
if !seen[num] {
seen[num] = true
block := pm.blockchain.GetBlockByNumber(uint64(num))
hashes = append(hashes, block.Hash())
if len(blocks) < tt.expected {
blocks = append(blocks, block)
}
break
}
}
}
for j, hash := range tt.explicit {
hashes = append(hashes, hash)
if tt.available[j] && len(blocks) < tt.expected {
blocks = append(blocks, pm.blockchain.GetBlockByHash(hash))
}
}
// Send the hash request and verify the response
p2p.Send(peer.app, 0x05, hashes)
if err := p2p.ExpectMsg(peer.app, 0x06, blocks); err != nil {
t.Errorf("test %d: blocks mismatch: %v", i, err)
}
}
}
// Tests that block headers can be retrieved from a remote chain based on user queries. // Tests that block headers can be retrieved from a remote chain based on user queries.
func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) } func TestGetBlockHeaders62(t *testing.T) { testGetBlockHeaders(t, 62) }
func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) } func TestGetBlockHeaders63(t *testing.T) { testGetBlockHeaders(t, 63) }

View File

@ -34,14 +34,6 @@ var (
propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic") propBlockInTrafficMeter = metrics.NewMeter("eth/prop/blocks/in/traffic")
propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets") propBlockOutPacketsMeter = metrics.NewMeter("eth/prop/blocks/out/packets")
propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic") propBlockOutTrafficMeter = metrics.NewMeter("eth/prop/blocks/out/traffic")
reqHashInPacketsMeter = metrics.NewMeter("eth/req/hashes/in/packets")
reqHashInTrafficMeter = metrics.NewMeter("eth/req/hashes/in/traffic")
reqHashOutPacketsMeter = metrics.NewMeter("eth/req/hashes/out/packets")
reqHashOutTrafficMeter = metrics.NewMeter("eth/req/hashes/out/traffic")
reqBlockInPacketsMeter = metrics.NewMeter("eth/req/blocks/in/packets")
reqBlockInTrafficMeter = metrics.NewMeter("eth/req/blocks/in/traffic")
reqBlockOutPacketsMeter = metrics.NewMeter("eth/req/blocks/out/packets")
reqBlockOutTrafficMeter = metrics.NewMeter("eth/req/blocks/out/traffic")
reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets") reqHeaderInPacketsMeter = metrics.NewMeter("eth/req/headers/in/packets")
reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic") reqHeaderInTrafficMeter = metrics.NewMeter("eth/req/headers/in/traffic")
reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets") reqHeaderOutPacketsMeter = metrics.NewMeter("eth/req/headers/out/packets")
@ -95,14 +87,9 @@ func (rw *meteredMsgReadWriter) ReadMsg() (p2p.Msg, error) {
// Account for the data traffic // Account for the data traffic
packets, traffic := miscInPacketsMeter, miscInTrafficMeter packets, traffic := miscInPacketsMeter, miscInTrafficMeter
switch { switch {
case rw.version < eth62 && msg.Code == BlockHashesMsg: case msg.Code == BlockHeadersMsg:
packets, traffic = reqHashInPacketsMeter, reqHashInTrafficMeter
case rw.version < eth62 && msg.Code == BlocksMsg:
packets, traffic = reqBlockInPacketsMeter, reqBlockInTrafficMeter
case rw.version >= eth62 && msg.Code == BlockHeadersMsg:
packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter packets, traffic = reqHeaderInPacketsMeter, reqHeaderInTrafficMeter
case rw.version >= eth62 && msg.Code == BlockBodiesMsg: case msg.Code == BlockBodiesMsg:
packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter packets, traffic = reqBodyInPacketsMeter, reqBodyInTrafficMeter
case rw.version >= eth63 && msg.Code == NodeDataMsg: case rw.version >= eth63 && msg.Code == NodeDataMsg:
@ -127,14 +114,9 @@ func (rw *meteredMsgReadWriter) WriteMsg(msg p2p.Msg) error {
// Account for the data traffic // Account for the data traffic
packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter packets, traffic := miscOutPacketsMeter, miscOutTrafficMeter
switch { switch {
case rw.version < eth62 && msg.Code == BlockHashesMsg: case msg.Code == BlockHeadersMsg:
packets, traffic = reqHashOutPacketsMeter, reqHashOutTrafficMeter
case rw.version < eth62 && msg.Code == BlocksMsg:
packets, traffic = reqBlockOutPacketsMeter, reqBlockOutTrafficMeter
case rw.version >= eth62 && msg.Code == BlockHeadersMsg:
packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter packets, traffic = reqHeaderOutPacketsMeter, reqHeaderOutTrafficMeter
case rw.version >= eth62 && msg.Code == BlockBodiesMsg: case msg.Code == BlockBodiesMsg:
packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter packets, traffic = reqBodyOutPacketsMeter, reqBodyOutTrafficMeter
case rw.version >= eth63 && msg.Code == NodeDataMsg: case rw.version >= eth63 && msg.Code == NodeDataMsg:

View File

@ -25,7 +25,6 @@ import (
"github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/eth/downloader"
"github.com/ethereum/go-ethereum/logger" "github.com/ethereum/go-ethereum/logger"
"github.com/ethereum/go-ethereum/logger/glog" "github.com/ethereum/go-ethereum/logger/glog"
"github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/p2p"
@ -154,25 +153,6 @@ func (p *peer) SendTransactions(txs types.Transactions) error {
return p2p.Send(p.rw, TxMsg, txs) return p2p.Send(p.rw, TxMsg, txs)
} }
// SendBlockHashes sends a batch of known hashes to the remote peer.
func (p *peer) SendBlockHashes(hashes []common.Hash) error {
return p2p.Send(p.rw, BlockHashesMsg, hashes)
}
// SendBlocks sends a batch of blocks to the remote peer.
func (p *peer) SendBlocks(blocks []*types.Block) error {
return p2p.Send(p.rw, BlocksMsg, blocks)
}
// SendNewBlockHashes61 announces the availability of a number of blocks through
// a hash notification.
func (p *peer) SendNewBlockHashes61(hashes []common.Hash) error {
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
return p2p.Send(p.rw, NewBlockHashesMsg, hashes)
}
// SendNewBlockHashes announces the availability of a number of blocks through // SendNewBlockHashes announces the availability of a number of blocks through
// a hash notification. // a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
@ -221,26 +201,6 @@ func (p *peer) SendReceiptsRLP(receipts []rlp.RawValue) error {
return p2p.Send(p.rw, ReceiptsMsg, receipts) return p2p.Send(p.rw, ReceiptsMsg, receipts)
} }
// RequestHashes fetches a batch of hashes from a peer, starting at from, going
// towards the genesis block.
func (p *peer) RequestHashes(from common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from %x...", p, downloader.MaxHashFetch, from[:4])
return p2p.Send(p.rw, GetBlockHashesMsg, getBlockHashesData{from, uint64(downloader.MaxHashFetch)})
}
// RequestHashesFromNumber fetches a batch of hashes from a peer, starting at
// the requested block number, going upwards towards the genesis block.
func (p *peer) RequestHashesFromNumber(from uint64, count int) error {
glog.V(logger.Debug).Infof("%v fetching hashes (%d) from #%d...", p, count, from)
return p2p.Send(p.rw, GetBlockHashesFromNumberMsg, getBlockHashesFromNumberData{from, uint64(count)})
}
// RequestBlocks fetches a batch of blocks corresponding to the specified hashes.
func (p *peer) RequestBlocks(hashes []common.Hash) error {
glog.V(logger.Debug).Infof("%v fetching %v blocks", p, len(hashes))
return p2p.Send(p.rw, GetBlocksMsg, hashes)
}
// RequestHeaders is a wrapper around the header query functions to fetch a // RequestHeaders is a wrapper around the header query functions to fetch a
// single header. It is used solely by the fetcher. // single header. It is used solely by the fetcher.
func (p *peer) RequestOneHeader(hash common.Hash) error { func (p *peer) RequestOneHeader(hash common.Hash) error {

View File

@ -28,7 +28,6 @@ import (
// Constants to match up protocol versions and messages // Constants to match up protocol versions and messages
const ( const (
eth61 = 61
eth62 = 62 eth62 = 62
eth63 = 63 eth63 = 63
) )
@ -49,26 +48,15 @@ const (
// eth protocol message codes // eth protocol message codes
const ( const (
// Protocol messages belonging to eth/61 // Protocol messages belonging to eth/62
StatusMsg = 0x00 StatusMsg = 0x00
NewBlockHashesMsg = 0x01 NewBlockHashesMsg = 0x01
TxMsg = 0x02 TxMsg = 0x02
GetBlockHashesMsg = 0x03
BlockHashesMsg = 0x04
GetBlocksMsg = 0x05
BlocksMsg = 0x06
NewBlockMsg = 0x07
GetBlockHashesFromNumberMsg = 0x08
// Protocol messages belonging to eth/62 (new protocol from scratch)
// StatusMsg = 0x00 (uncomment after eth/61 deprecation)
// NewBlockHashesMsg = 0x01 (uncomment after eth/61 deprecation)
// TxMsg = 0x02 (uncomment after eth/61 deprecation)
GetBlockHeadersMsg = 0x03 GetBlockHeadersMsg = 0x03
BlockHeadersMsg = 0x04 BlockHeadersMsg = 0x04
GetBlockBodiesMsg = 0x05 GetBlockBodiesMsg = 0x05
BlockBodiesMsg = 0x06 BlockBodiesMsg = 0x06
// NewBlockMsg = 0x07 (uncomment after eth/61 deprecation) NewBlockMsg = 0x07
// Protocol messages belonging to eth/63 // Protocol messages belonging to eth/63
GetNodeDataMsg = 0x0d GetNodeDataMsg = 0x0d
@ -117,12 +105,6 @@ type txPool interface {
GetTransactions() types.Transactions GetTransactions() types.Transactions
} }
type chainManager interface {
GetBlockHashesFromHash(hash common.Hash, amount uint64) (hashes []common.Hash)
GetBlock(hash common.Hash) (block *types.Block)
Status() (td *big.Int, currentBlock common.Hash, genesisBlock common.Hash)
}
// statusData is the network packet for the status message. // statusData is the network packet for the status message.
type statusData struct { type statusData struct {
ProtocolVersion uint32 ProtocolVersion uint32
@ -138,19 +120,6 @@ type newBlockHashesData []struct {
Number uint64 // Number of one particular block being announced Number uint64 // Number of one particular block being announced
} }
// getBlockHashesData is the network packet for the hash based hash retrieval.
type getBlockHashesData struct {
Hash common.Hash
Amount uint64
}
// getBlockHashesFromNumberData is the network packet for the number based hash
// retrieval.
type getBlockHashesFromNumberData struct {
Number uint64
Amount uint64
}
// getBlockHeadersData represents a block header query. // getBlockHeadersData represents a block header query.
type getBlockHeadersData struct { type getBlockHeadersData struct {
Origin hashOrNumber // Block from which to retrieve headers Origin hashOrNumber // Block from which to retrieve headers
@ -209,8 +178,3 @@ type blockBody struct {
// blockBodiesData is the network packet for block content distribution. // blockBodiesData is the network packet for block content distribution.
type blockBodiesData []*blockBody type blockBodiesData []*blockBody
// nodeDataData is the network response packet for a node data retrieval.
type nodeDataData []struct {
Value []byte
}

View File

@ -37,7 +37,6 @@ func init() {
var testAccount, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291") var testAccount, _ = crypto.HexToECDSA("b71c71a67e1177ad4e901695e1b4b9ee17ae16c6668d313eac2f96dbcda3f291")
// Tests that handshake failures are detected and reported correctly. // Tests that handshake failures are detected and reported correctly.
func TestStatusMsgErrors61(t *testing.T) { testStatusMsgErrors(t, 61) }
func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) } func TestStatusMsgErrors62(t *testing.T) { testStatusMsgErrors(t, 62) }
func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) } func TestStatusMsgErrors63(t *testing.T) { testStatusMsgErrors(t, 63) }
@ -90,7 +89,6 @@ func testStatusMsgErrors(t *testing.T, protocol int) {
} }
// This test checks that received transactions are added to the local pool. // This test checks that received transactions are added to the local pool.
func TestRecvTransactions61(t *testing.T) { testRecvTransactions(t, 61) }
func TestRecvTransactions62(t *testing.T) { testRecvTransactions(t, 62) } func TestRecvTransactions62(t *testing.T) { testRecvTransactions(t, 62) }
func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) }
@ -119,7 +117,6 @@ func testRecvTransactions(t *testing.T, protocol int) {
} }
// This test checks that pending transactions are sent. // This test checks that pending transactions are sent.
func TestSendTransactions61(t *testing.T) { testSendTransactions(t, 61) }
func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) } func TestSendTransactions62(t *testing.T) { testSendTransactions(t, 62) }
func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) } func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) }