diff --git a/core/tx_pool.go b/core/tx_pool.go
index ae6962c5d..16d80d644 100644
--- a/core/tx_pool.go
+++ b/core/tx_pool.go
@@ -864,6 +864,12 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction {
return pool.all.Get(hash)
}
+// Has returns an indicator whether txpool has a transaction cached with the
+// given hash.
+func (pool *TxPool) Has(hash common.Hash) bool {
+ return pool.all.Get(hash) != nil
+}
+
// removeTx removes a single transaction from the queue, moving all subsequent
// transactions back to the future queue.
func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) {
diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go
index 60f86d0e1..5c2020d7d 100644
--- a/eth/downloader/peer.go
+++ b/eth/downloader/peer.go
@@ -470,7 +470,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.headerThroughput
}
- return ps.idlePeers(62, 64, idle, throughput)
+ return ps.idlePeers(62, 65, idle, throughput)
}
// BodyIdlePeers retrieves a flat list of all the currently body-idle peers within
@@ -484,7 +484,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.blockThroughput
}
- return ps.idlePeers(62, 64, idle, throughput)
+ return ps.idlePeers(62, 65, idle, throughput)
}
// ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers
@@ -498,7 +498,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.receiptThroughput
}
- return ps.idlePeers(63, 64, idle, throughput)
+ return ps.idlePeers(63, 65, idle, throughput)
}
// NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle
@@ -512,7 +512,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) {
defer p.lock.RUnlock()
return p.stateThroughput
}
- return ps.idlePeers(63, 64, idle, throughput)
+ return ps.idlePeers(63, 65, idle, throughput)
}
// idlePeers retrieves a flat list of all currently idle peers satisfying the
diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/block_fetcher.go
similarity index 84%
rename from eth/fetcher/fetcher.go
rename to eth/fetcher/block_fetcher.go
index 28c532d9b..7395ec83d 100644
--- a/eth/fetcher/fetcher.go
+++ b/eth/fetcher/block_fetcher.go
@@ -14,7 +14,7 @@
// You should have received a copy of the GNU Lesser General Public License
// along with the go-ethereum library. If not, see .
-// Package fetcher contains the block announcement based synchronisation.
+// Package fetcher contains the announcement based blocks or transaction synchronisation.
package fetcher
import (
@@ -30,13 +30,16 @@ import (
)
const (
- arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested
+ arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested
gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches
- fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block
- maxUncleDist = 7 // Maximum allowed backward distance from the chain head
- maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
- hashLimit = 256 // Maximum number of unique blocks a peer may have announced
- blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
+ fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction
+)
+
+const (
+ maxUncleDist = 7 // Maximum allowed backward distance from the chain head
+ maxQueueDist = 32 // Maximum allowed distance from the chain head to queue
+ hashLimit = 256 // Maximum number of unique blocks a peer may have announced
+ blockLimit = 64 // Maximum number of unique blocks a peer may have delivered
)
var (
@@ -67,9 +70,9 @@ type chainInsertFn func(types.Blocks) (int, error)
// peerDropFn is a callback type for dropping a peer detected as malicious.
type peerDropFn func(id string)
-// announce is the hash notification of the availability of a new block in the
+// blockAnnounce is the hash notification of the availability of a new block in the
// network.
-type announce struct {
+type blockAnnounce struct {
hash common.Hash // Hash of the block being announced
number uint64 // Number of the block being announced (0 = unknown | old protocol)
header *types.Header // Header of the block partially reassembled (new protocol)
@@ -97,18 +100,18 @@ type bodyFilterTask struct {
time time.Time // Arrival time of the blocks' contents
}
-// inject represents a schedules import operation.
-type inject struct {
+// blockInject represents a schedules import operation.
+type blockInject struct {
origin string
block *types.Block
}
-// Fetcher is responsible for accumulating block announcements from various peers
+// BlockFetcher is responsible for accumulating block announcements from various peers
// and scheduling them for retrieval.
-type Fetcher struct {
+type BlockFetcher struct {
// Various event channels
- notify chan *announce
- inject chan *inject
+ notify chan *blockAnnounce
+ inject chan *blockInject
headerFilter chan chan *headerFilterTask
bodyFilter chan chan *bodyFilterTask
@@ -117,16 +120,16 @@ type Fetcher struct {
quit chan struct{}
// Announce states
- announces map[string]int // Per peer announce counts to prevent memory exhaustion
- announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching
- fetching map[common.Hash]*announce // Announced blocks, currently fetching
- fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval
- completing map[common.Hash]*announce // Blocks with headers, currently body-completing
+ announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion
+ announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching
+ fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching
+ fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval
+ completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing
// Block cache
- queue *prque.Prque // Queue containing the import operations (block number sorted)
- queues map[string]int // Per peer block counts to prevent memory exhaustion
- queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports)
+ queue *prque.Prque // Queue containing the import operations (block number sorted)
+ queues map[string]int // Per peer block counts to prevent memory exhaustion
+ queued map[common.Hash]*blockInject // Set of already queued blocks (to dedupe imports)
// Callbacks
getBlock blockRetrievalFn // Retrieves a block from the local chain
@@ -137,30 +140,30 @@ type Fetcher struct {
dropPeer peerDropFn // Drops a peer for misbehaving
// Testing hooks
- announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list
+ announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list
queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue
fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch
completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62)
importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62)
}
-// New creates a block fetcher to retrieve blocks based on hash announcements.
-func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher {
- return &Fetcher{
- notify: make(chan *announce),
- inject: make(chan *inject),
+// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements.
+func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher {
+ return &BlockFetcher{
+ notify: make(chan *blockAnnounce),
+ inject: make(chan *blockInject),
headerFilter: make(chan chan *headerFilterTask),
bodyFilter: make(chan chan *bodyFilterTask),
done: make(chan common.Hash),
quit: make(chan struct{}),
announces: make(map[string]int),
- announced: make(map[common.Hash][]*announce),
- fetching: make(map[common.Hash]*announce),
- fetched: make(map[common.Hash][]*announce),
- completing: make(map[common.Hash]*announce),
+ announced: make(map[common.Hash][]*blockAnnounce),
+ fetching: make(map[common.Hash]*blockAnnounce),
+ fetched: make(map[common.Hash][]*blockAnnounce),
+ completing: make(map[common.Hash]*blockAnnounce),
queue: prque.New(nil),
queues: make(map[string]int),
- queued: make(map[common.Hash]*inject),
+ queued: make(map[common.Hash]*blockInject),
getBlock: getBlock,
verifyHeader: verifyHeader,
broadcastBlock: broadcastBlock,
@@ -172,21 +175,21 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc
// Start boots up the announcement based synchroniser, accepting and processing
// hash notifications and block fetches until termination requested.
-func (f *Fetcher) Start() {
+func (f *BlockFetcher) Start() {
go f.loop()
}
// Stop terminates the announcement based synchroniser, canceling all pending
// operations.
-func (f *Fetcher) Stop() {
+func (f *BlockFetcher) Stop() {
close(f.quit)
}
// Notify announces the fetcher of the potential availability of a new block in
// the network.
-func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
+func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time,
headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error {
- block := &announce{
+ block := &blockAnnounce{
hash: hash,
number: number,
time: time,
@@ -203,8 +206,8 @@ func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time
}
// Enqueue tries to fill gaps the fetcher's future import queue.
-func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
- op := &inject{
+func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error {
+ op := &blockInject{
origin: peer,
block: block,
}
@@ -218,7 +221,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error {
// FilterHeaders extracts all the headers that were explicitly requested by the fetcher,
// returning those that should be handled differently.
-func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
+func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header {
log.Trace("Filtering headers", "peer", peer, "headers", len(headers))
// Send the filter channel to the fetcher
@@ -246,7 +249,7 @@ func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.
// FilterBodies extracts all the block bodies that were explicitly requested by
// the fetcher, returning those that should be handled differently.
-func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
+func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) {
log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles))
// Send the filter channel to the fetcher
@@ -274,7 +277,7 @@ func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction,
// Loop is the main fetcher loop, checking and processing various notification
// events.
-func (f *Fetcher) loop() {
+func (f *BlockFetcher) loop() {
// Iterate the block fetching until a quit is requested
fetchTimer := time.NewTimer(0)
completeTimer := time.NewTimer(0)
@@ -289,7 +292,7 @@ func (f *Fetcher) loop() {
// Import any queued blocks that could potentially fit
height := f.chainHeight()
for !f.queue.Empty() {
- op := f.queue.PopItem().(*inject)
+ op := f.queue.PopItem().(*blockInject)
hash := op.block.Hash()
if f.queueChangeHook != nil {
f.queueChangeHook(hash, false)
@@ -313,24 +316,24 @@ func (f *Fetcher) loop() {
// Wait for an outside event to occur
select {
case <-f.quit:
- // Fetcher terminating, abort all operations
+ // BlockFetcher terminating, abort all operations
return
case notification := <-f.notify:
// A block was announced, make sure the peer isn't DOSing us
- propAnnounceInMeter.Mark(1)
+ blockAnnounceInMeter.Mark(1)
count := f.announces[notification.origin] + 1
if count > hashLimit {
log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit)
- propAnnounceDOSMeter.Mark(1)
+ blockAnnounceDOSMeter.Mark(1)
break
}
// If we have a valid block number, check that it's potentially useful
if notification.number > 0 {
if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist)
- propAnnounceDropMeter.Mark(1)
+ blockAnnounceDropMeter.Mark(1)
break
}
}
@@ -352,7 +355,7 @@ func (f *Fetcher) loop() {
case op := <-f.inject:
// A direct block insertion was requested, try and fill any pending gaps
- propBroadcastInMeter.Mark(1)
+ blockBroadcastInMeter.Mark(1)
f.enqueue(op.origin, op.block)
case hash := <-f.done:
@@ -439,7 +442,7 @@ func (f *Fetcher) loop() {
// Split the batch of headers into unknown ones (to return to the caller),
// known incomplete ones (requiring body retrievals) and completed blocks.
- unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{}
+ unknown, incomplete, complete := []*types.Header{}, []*blockAnnounce{}, []*types.Block{}
for _, header := range task.headers {
hash := header.Hash()
@@ -475,7 +478,7 @@ func (f *Fetcher) loop() {
f.forgetHash(hash)
}
} else {
- // Fetcher doesn't know about it, add to the return list
+ // BlockFetcher doesn't know about it, add to the return list
unknown = append(unknown, header)
}
}
@@ -562,8 +565,8 @@ func (f *Fetcher) loop() {
}
}
-// rescheduleFetch resets the specified fetch timer to the next announce timeout.
-func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
+// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
+func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) {
// Short circuit if no blocks are announced
if len(f.announced) == 0 {
return
@@ -579,7 +582,7 @@ func (f *Fetcher) rescheduleFetch(fetch *time.Timer) {
}
// rescheduleComplete resets the specified completion timer to the next fetch timeout.
-func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
+func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) {
// Short circuit if no headers are fetched
if len(f.fetched) == 0 {
return
@@ -596,27 +599,27 @@ func (f *Fetcher) rescheduleComplete(complete *time.Timer) {
// enqueue schedules a new future import operation, if the block to be imported
// has not yet been seen.
-func (f *Fetcher) enqueue(peer string, block *types.Block) {
+func (f *BlockFetcher) enqueue(peer string, block *types.Block) {
hash := block.Hash()
// Ensure the peer isn't DOSing us
count := f.queues[peer] + 1
if count > blockLimit {
log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit)
- propBroadcastDOSMeter.Mark(1)
+ blockBroadcastDOSMeter.Mark(1)
f.forgetHash(hash)
return
}
// Discard any past or too distant blocks
if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist {
log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist)
- propBroadcastDropMeter.Mark(1)
+ blockBroadcastDropMeter.Mark(1)
f.forgetHash(hash)
return
}
// Schedule the block for future importing
if _, ok := f.queued[hash]; !ok {
- op := &inject{
+ op := &blockInject{
origin: peer,
block: block,
}
@@ -633,7 +636,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) {
// insert spawns a new goroutine to run a block insertion into the chain. If the
// block's number is at the same height as the current import phase, it updates
// the phase states accordingly.
-func (f *Fetcher) insert(peer string, block *types.Block) {
+func (f *BlockFetcher) insert(peer string, block *types.Block) {
hash := block.Hash()
// Run the import on a new thread
@@ -651,7 +654,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
switch err := f.verifyHeader(block.Header()); err {
case nil:
// All ok, quickly propagate to our peers
- propBroadcastOutTimer.UpdateSince(block.ReceivedAt)
+ blockBroadcastOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, true)
case consensus.ErrFutureBlock:
@@ -669,7 +672,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
return
}
// If import succeeded, broadcast the block
- propAnnounceOutTimer.UpdateSince(block.ReceivedAt)
+ blockAnnounceOutTimer.UpdateSince(block.ReceivedAt)
go f.broadcastBlock(block, false)
// Invoke the testing hook if needed
@@ -681,7 +684,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) {
// forgetHash removes all traces of a block announcement from the fetcher's
// internal state.
-func (f *Fetcher) forgetHash(hash common.Hash) {
+func (f *BlockFetcher) forgetHash(hash common.Hash) {
// Remove all pending announces and decrement DOS counters
for _, announce := range f.announced[hash] {
f.announces[announce.origin]--
@@ -723,7 +726,7 @@ func (f *Fetcher) forgetHash(hash common.Hash) {
// forgetBlock removes all traces of a queued block from the fetcher's internal
// state.
-func (f *Fetcher) forgetBlock(hash common.Hash) {
+func (f *BlockFetcher) forgetBlock(hash common.Hash) {
if insert := f.queued[hash]; insert != nil {
f.queues[insert.origin]--
if f.queues[insert.origin] == 0 {
diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/block_fetcher_test.go
similarity index 99%
rename from eth/fetcher/fetcher_test.go
rename to eth/fetcher/block_fetcher_test.go
index 83172c534..038ead12e 100644
--- a/eth/fetcher/fetcher_test.go
+++ b/eth/fetcher/block_fetcher_test.go
@@ -76,7 +76,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common
// fetcherTester is a test simulator for mocking out local block chain.
type fetcherTester struct {
- fetcher *Fetcher
+ fetcher *BlockFetcher
hashes []common.Hash // Hash chain belonging to the tester
blocks map[common.Hash]*types.Block // Blocks belonging to the tester
@@ -92,7 +92,7 @@ func newTester() *fetcherTester {
blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis},
drops: make(map[string]bool),
}
- tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
+ tester.fetcher = NewBlockFetcher(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer)
tester.fetcher.Start()
return tester
diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go
index d68d12f00..b75889938 100644
--- a/eth/fetcher/metrics.go
+++ b/eth/fetcher/metrics.go
@@ -23,15 +23,15 @@ import (
)
var (
- propAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/in", nil)
- propAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/announces/out", nil)
- propAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/drop", nil)
- propAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/dos", nil)
+ blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/in", nil)
+ blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/announces/out", nil)
+ blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/drop", nil)
+ blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/dos", nil)
- propBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/in", nil)
- propBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/broadcasts/out", nil)
- propBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/drop", nil)
- propBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/dos", nil)
+ blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/in", nil)
+ blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/broadcasts/out", nil)
+ blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/drop", nil)
+ blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/dos", nil)
headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/headers", nil)
bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/bodies", nil)
@@ -40,4 +40,15 @@ var (
headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/headers/out", nil)
bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/in", nil)
bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/out", nil)
+
+ txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/in", nil)
+ txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/dos", nil)
+ txAnnounceSkipMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/skip", nil)
+ txAnnounceUnderpriceMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/underprice", nil)
+ txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/broadcasts/in", nil)
+ txFetchOutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/out", nil)
+ txFetchSuccessMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/success", nil)
+ txFetchTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/timeout", nil)
+ txFetchInvalidMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/invalid", nil)
+ txFetchDurationTimer = metrics.NewRegisteredTimer("eth/fetcher/fetch/transaction/duration", nil)
)
diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go
new file mode 100644
index 000000000..1dabb0819
--- /dev/null
+++ b/eth/fetcher/tx_fetcher.go
@@ -0,0 +1,319 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package fetcher
+
+import (
+ "math/rand"
+ "time"
+
+ mapset "github.com/deckarep/golang-set"
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/log"
+)
+
+var (
+ // txAnnounceLimit is the maximum number of unique transaction a peer
+ // can announce in a short time.
+ txAnnounceLimit = 4096
+
+ // txFetchTimeout is the maximum allotted time to return an explicitly
+ // requested transaction.
+ txFetchTimeout = 5 * time.Second
+
+ // MaxTransactionFetch is the maximum transaction number can be fetched
+ // in one request. The rationale to pick this value is:
+ // In eth protocol, the softResponseLimit is 2MB. Nowdays according to
+ // Etherscan the average transaction size is around 200B, so in theory
+ // we can include lots of transaction in a single protocol packet. However
+ // the maximum size of a single transaction is raised to 128KB, so pick
+ // a middle value here to ensure we can maximize the efficiency of the
+ // retrieval and response size overflow won't happen in most cases.
+ MaxTransactionFetch = 256
+
+ // underpriceSetSize is the size of underprice set which used for maintaining
+ // the set of underprice transactions.
+ underpriceSetSize = 4096
+)
+
+// txAnnounce is the notification of the availability of a single
+// new transaction in the network.
+type txAnnounce struct {
+ origin string // Identifier of the peer originating the notification
+ time time.Time // Timestamp of the announcement
+ fetchTxs func([]common.Hash) // Callback for retrieving transaction from specified peer
+}
+
+// txsAnnounce is the notification of the availability of a batch
+// of new transactions in the network.
+type txsAnnounce struct {
+ hashes []common.Hash // Batch of transaction hashes being announced
+ origin string // Identifier of the peer originating the notification
+ time time.Time // Timestamp of the announcement
+ fetchTxs func([]common.Hash) // Callback for retrieving transaction from specified peer
+}
+
+// TxFetcher is responsible for retrieving new transaction based
+// on the announcement.
+type TxFetcher struct {
+ notify chan *txsAnnounce
+ cleanup chan []common.Hash
+ quit chan struct{}
+
+ // Announce states
+ announces map[string]int // Per peer transaction announce counts to prevent memory exhaustion
+ announced map[common.Hash][]*txAnnounce // Announced transactions, scheduled for fetching
+ fetching map[common.Hash]*txAnnounce // Announced transactions, currently fetching
+ underpriced mapset.Set // Transaction set whose price is too low for accepting
+
+ // Callbacks
+ hasTx func(common.Hash) bool // Retrieves a tx from the local txpool
+ addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool
+ dropPeer func(string) // Drop the specified peer
+
+ // Hooks
+ announceHook func([]common.Hash) // Hook which is called when a batch transactions are announced
+ importTxsHook func([]*types.Transaction) // Hook which is called when a batch of transactions are imported.
+ dropHook func(string) // Hook which is called when a peer is dropped
+ cleanupHook func([]common.Hash) // Hook which is called when internal status is cleaned
+ rejectUnderprice func(common.Hash) // Hook which is called when underprice transaction is rejected
+}
+
+// NewTxFetcher creates a transaction fetcher to retrieve transaction
+// based on hash announcements.
+func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, dropPeer func(string)) *TxFetcher {
+ return &TxFetcher{
+ notify: make(chan *txsAnnounce),
+ cleanup: make(chan []common.Hash),
+ quit: make(chan struct{}),
+ announces: make(map[string]int),
+ announced: make(map[common.Hash][]*txAnnounce),
+ fetching: make(map[common.Hash]*txAnnounce),
+ underpriced: mapset.NewSet(),
+ hasTx: hasTx,
+ addTxs: addTxs,
+ dropPeer: dropPeer,
+ }
+}
+
+// Notify announces the fetcher of the potential availability of a
+// new transaction in the network.
+func (f *TxFetcher) Notify(peer string, hashes []common.Hash, time time.Time, fetchTxs func([]common.Hash)) error {
+ announce := &txsAnnounce{
+ hashes: hashes,
+ time: time,
+ origin: peer,
+ fetchTxs: fetchTxs,
+ }
+ select {
+ case f.notify <- announce:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// EnqueueTxs imports a batch of received transaction into fetcher.
+func (f *TxFetcher) EnqueueTxs(peer string, txs []*types.Transaction) error {
+ var (
+ drop bool
+ hashes []common.Hash
+ )
+ errs := f.addTxs(txs)
+ for i, err := range errs {
+ if err != nil {
+ // Drop peer if the received transaction isn't signed properly.
+ drop = (drop || err == core.ErrInvalidSender)
+ txFetchInvalidMeter.Mark(1)
+
+ // Track the transaction hash if the price is too low for us.
+ // Avoid re-request this transaction when we receive another
+ // announcement.
+ if err == core.ErrUnderpriced {
+ for f.underpriced.Cardinality() >= underpriceSetSize {
+ f.underpriced.Pop()
+ }
+ f.underpriced.Add(txs[i].Hash())
+ }
+ }
+ hashes = append(hashes, txs[i].Hash())
+ }
+ if f.importTxsHook != nil {
+ f.importTxsHook(txs)
+ }
+ // Drop the peer if some transaction failed signature verification.
+ // We can regard this peer is trying to DOS us by feeding lots of
+ // random hashes.
+ if drop {
+ f.dropPeer(peer)
+ if f.dropHook != nil {
+ f.dropHook(peer)
+ }
+ }
+ select {
+ case f.cleanup <- hashes:
+ return nil
+ case <-f.quit:
+ return errTerminated
+ }
+}
+
+// Start boots up the announcement based synchroniser, accepting and processing
+// hash notifications and block fetches until termination requested.
+func (f *TxFetcher) Start() {
+ go f.loop()
+}
+
+// Stop terminates the announcement based synchroniser, canceling all pending
+// operations.
+func (f *TxFetcher) Stop() {
+ close(f.quit)
+}
+
+func (f *TxFetcher) loop() {
+ fetchTimer := time.NewTimer(0)
+
+ for {
+ // Clean up any expired transaction fetches.
+ // There are many cases can lead to it:
+ // * We send the request to busy peer which can reply immediately
+ // * We send the request to malicious peer which doesn't reply deliberately
+ // * We send the request to normal peer for a batch of transaction, but some
+ // transactions have been included into blocks. According to EIP these txs
+ // won't be included.
+ // But it's fine to delete the fetching record and reschedule fetching iff we
+ // receive the annoucement again.
+ for hash, announce := range f.fetching {
+ if time.Since(announce.time) > txFetchTimeout {
+ delete(f.fetching, hash)
+ txFetchTimeoutMeter.Mark(1)
+ }
+ }
+ select {
+ case anno := <-f.notify:
+ txAnnounceInMeter.Mark(int64(len(anno.hashes)))
+
+ // Drop the new announce if there are too many accumulated.
+ count := f.announces[anno.origin] + len(anno.hashes)
+ if count > txAnnounceLimit {
+ txAnnounceDOSMeter.Mark(int64(count - txAnnounceLimit))
+ break
+ }
+ f.announces[anno.origin] = count
+
+ // All is well, schedule the announce if transaction is not yet downloading
+ empty := len(f.announced) == 0
+ for _, hash := range anno.hashes {
+ if _, ok := f.fetching[hash]; ok {
+ continue
+ }
+ if f.underpriced.Contains(hash) {
+ txAnnounceUnderpriceMeter.Mark(1)
+ if f.rejectUnderprice != nil {
+ f.rejectUnderprice(hash)
+ }
+ continue
+ }
+ f.announced[hash] = append(f.announced[hash], &txAnnounce{
+ origin: anno.origin,
+ time: anno.time,
+ fetchTxs: anno.fetchTxs,
+ })
+ }
+ if empty && len(f.announced) > 0 {
+ f.reschedule(fetchTimer)
+ }
+ if f.announceHook != nil {
+ f.announceHook(anno.hashes)
+ }
+ case <-fetchTimer.C:
+ // At least one tx's timer ran out, check for needing retrieval
+ request := make(map[string][]common.Hash)
+
+ for hash, announces := range f.announced {
+ if time.Since(announces[0].time) > arriveTimeout-gatherSlack {
+ // Pick a random peer to retrieve from, reset all others
+ announce := announces[rand.Intn(len(announces))]
+ f.forgetHash(hash)
+
+ // Skip fetching if we already receive the transaction.
+ if f.hasTx(hash) {
+ txAnnounceSkipMeter.Mark(1)
+ continue
+ }
+ // If the transaction still didn't arrive, queue for fetching
+ request[announce.origin] = append(request[announce.origin], hash)
+ f.fetching[hash] = announce
+ }
+ }
+ // Send out all block header requests
+ for peer, hashes := range request {
+ log.Trace("Fetching scheduled transactions", "peer", peer, "txs", hashes)
+ fetchTxs := f.fetching[hashes[0]].fetchTxs
+ fetchTxs(hashes)
+ txFetchOutMeter.Mark(int64(len(hashes)))
+ }
+ // Schedule the next fetch if blocks are still pending
+ f.reschedule(fetchTimer)
+ case hashes := <-f.cleanup:
+ for _, hash := range hashes {
+ f.forgetHash(hash)
+ anno, exist := f.fetching[hash]
+ if !exist {
+ txBroadcastInMeter.Mark(1) // Directly transaction propagation
+ continue
+ }
+ txFetchDurationTimer.UpdateSince(anno.time)
+ txFetchSuccessMeter.Mark(1)
+ delete(f.fetching, hash)
+ }
+ if f.cleanupHook != nil {
+ f.cleanupHook(hashes)
+ }
+ case <-f.quit:
+ return
+ }
+ }
+}
+
+// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout.
+func (f *TxFetcher) reschedule(fetch *time.Timer) {
+ // Short circuit if no transactions are announced
+ if len(f.announced) == 0 {
+ return
+ }
+ // Otherwise find the earliest expiring announcement
+ earliest := time.Now()
+ for _, announces := range f.announced {
+ if earliest.After(announces[0].time) {
+ earliest = announces[0].time
+ }
+ }
+ fetch.Reset(arriveTimeout - time.Since(earliest))
+}
+
+func (f *TxFetcher) forgetHash(hash common.Hash) {
+ // Remove all pending announces and decrement DOS counters
+ for _, announce := range f.announced[hash] {
+ f.announces[announce.origin]--
+ if f.announces[announce.origin] <= 0 {
+ delete(f.announces, announce.origin)
+ }
+ }
+ delete(f.announced, hash)
+}
diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go
new file mode 100644
index 000000000..26f24f3f3
--- /dev/null
+++ b/eth/fetcher/tx_fetcher_test.go
@@ -0,0 +1,318 @@
+// Copyright 2020 The go-ethereum Authors
+// This file is part of the go-ethereum library.
+//
+// The go-ethereum library is free software: you can redistribute it and/or modify
+// it under the terms of the GNU Lesser General Public License as published by
+// the Free Software Foundation, either version 3 of the License, or
+// (at your option) any later version.
+//
+// The go-ethereum library is distributed in the hope that it will be useful,
+// but WITHOUT ANY WARRANTY; without even the implied warranty of
+// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+// GNU Lesser General Public License for more details.
+//
+// You should have received a copy of the GNU Lesser General Public License
+// along with the go-ethereum library. If not, see .
+
+package fetcher
+
+import (
+ "crypto/ecdsa"
+ "math/big"
+ "math/rand"
+ "sync"
+ "sync/atomic"
+ "testing"
+ "time"
+
+ "github.com/ethereum/go-ethereum/common"
+ "github.com/ethereum/go-ethereum/core"
+ "github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/crypto"
+)
+
+func init() {
+ rand.Seed(int64(time.Now().Nanosecond()))
+
+ txAnnounceLimit = 64
+ MaxTransactionFetch = 16
+}
+
+func makeTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction {
+ var txs []*types.Transaction
+
+ for i := 0; i < target; i++ {
+ random := rand.Uint32()
+ tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil)
+ tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), key)
+ txs = append(txs, tx)
+ }
+ return txs
+}
+
+func makeUnsignedTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction {
+ var txs []*types.Transaction
+
+ for i := 0; i < target; i++ {
+ random := rand.Uint32()
+ tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil)
+ txs = append(txs, tx)
+ }
+ return txs
+}
+
+type txfetcherTester struct {
+ fetcher *TxFetcher
+
+ priceLimit *big.Int
+ sender *ecdsa.PrivateKey
+ senderAddr common.Address
+ signer types.Signer
+ txs map[common.Hash]*types.Transaction
+ dropped map[string]struct{}
+ lock sync.RWMutex
+}
+
+func newTxFetcherTester() *txfetcherTester {
+ key, _ := crypto.GenerateKey()
+ addr := crypto.PubkeyToAddress(key.PublicKey)
+ t := &txfetcherTester{
+ sender: key,
+ senderAddr: addr,
+ signer: types.NewEIP155Signer(big.NewInt(1)),
+ txs: make(map[common.Hash]*types.Transaction),
+ dropped: make(map[string]struct{}),
+ }
+ t.fetcher = NewTxFetcher(t.hasTx, t.addTxs, t.dropPeer)
+ t.fetcher.Start()
+ return t
+}
+
+func (t *txfetcherTester) hasTx(hash common.Hash) bool {
+ t.lock.RLock()
+ defer t.lock.RUnlock()
+
+ return t.txs[hash] != nil
+}
+
+func (t *txfetcherTester) addTxs(txs []*types.Transaction) []error {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ var errors []error
+ for _, tx := range txs {
+ // Make sure the transaction is signed properly
+ _, err := types.Sender(t.signer, tx)
+ if err != nil {
+ errors = append(errors, core.ErrInvalidSender)
+ continue
+ }
+ // Make sure the price is high enough to accpet
+ if t.priceLimit != nil && tx.GasPrice().Cmp(t.priceLimit) < 0 {
+ errors = append(errors, core.ErrUnderpriced)
+ continue
+ }
+ t.txs[tx.Hash()] = tx
+ errors = append(errors, nil)
+ }
+ return errors
+}
+
+func (t *txfetcherTester) dropPeer(id string) {
+ t.lock.Lock()
+ defer t.lock.Unlock()
+
+ t.dropped[id] = struct{}{}
+}
+
+// makeTxFetcher retrieves a batch of transaction associated with a simulated peer.
+func (t *txfetcherTester) makeTxFetcher(peer string, txs []*types.Transaction) func(hashes []common.Hash) {
+ closure := make(map[common.Hash]*types.Transaction)
+ for _, tx := range txs {
+ closure[tx.Hash()] = tx
+ }
+ return func(hashes []common.Hash) {
+ var txs []*types.Transaction
+ for _, hash := range hashes {
+ tx := closure[hash]
+ if tx == nil {
+ continue
+ }
+ txs = append(txs, tx)
+ }
+ // Return on a new thread
+ go t.fetcher.EnqueueTxs(peer, txs)
+ }
+}
+
+func TestSequentialTxAnnouncements(t *testing.T) {
+ tester := newTxFetcherTester()
+ txs := makeTransactions(tester.sender, txAnnounceLimit)
+
+ retrieveTxs := tester.makeTxFetcher("peer", txs)
+
+ newTxsCh := make(chan struct{})
+ tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
+ newTxsCh <- struct{}{}
+ }
+ for _, tx := range txs {
+ tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), retrieveTxs)
+ select {
+ case <-newTxsCh:
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+ }
+ if len(tester.txs) != len(txs) {
+ t.Fatalf("Imported transaction number mismatch, want %d, got %d", len(txs), len(tester.txs))
+ }
+}
+
+func TestConcurrentAnnouncements(t *testing.T) {
+ tester := newTxFetcherTester()
+ txs := makeTransactions(tester.sender, txAnnounceLimit)
+
+ txFetcherFn1 := tester.makeTxFetcher("peer1", txs)
+ txFetcherFn2 := tester.makeTxFetcher("peer2", txs)
+
+ var (
+ count uint32
+ done = make(chan struct{})
+ )
+ tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
+ atomic.AddUint32(&count, uint32(len(transactions)))
+ if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) {
+ done <- struct{}{}
+ }
+ }
+ for _, tx := range txs {
+ tester.fetcher.Notify("peer1", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn1)
+ tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout+time.Millisecond), txFetcherFn2)
+ tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout-time.Millisecond), txFetcherFn2)
+ }
+ select {
+ case <-done:
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+}
+
+func TestBatchAnnouncements(t *testing.T) {
+ tester := newTxFetcherTester()
+ txs := makeTransactions(tester.sender, txAnnounceLimit)
+
+ retrieveTxs := tester.makeTxFetcher("peer", txs)
+
+ var count uint32
+ var done = make(chan struct{})
+ tester.fetcher.importTxsHook = func(txs []*types.Transaction) {
+ atomic.AddUint32(&count, uint32(len(txs)))
+
+ if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) {
+ done <- struct{}{}
+ }
+ }
+ // Send all announces which exceeds the limit.
+ var hashes []common.Hash
+ for _, tx := range txs {
+ hashes = append(hashes, tx.Hash())
+ }
+ tester.fetcher.Notify("peer", hashes, time.Now(), retrieveTxs)
+
+ select {
+ case <-done:
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+}
+
+func TestPropagationAfterAnnounce(t *testing.T) {
+ tester := newTxFetcherTester()
+ txs := makeTransactions(tester.sender, txAnnounceLimit)
+
+ var cleaned = make(chan struct{})
+ tester.fetcher.cleanupHook = func(hashes []common.Hash) {
+ cleaned <- struct{}{}
+ }
+ retrieveTxs := tester.makeTxFetcher("peer", txs)
+ for _, tx := range txs {
+ tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), retrieveTxs)
+ tester.fetcher.EnqueueTxs("peer", []*types.Transaction{tx})
+
+ // It's ok to read the map directly since no write
+ // will happen in the same time.
+ <-cleaned
+ if len(tester.fetcher.announced) != 0 {
+ t.Fatalf("Announcement should be cleaned, got %d", len(tester.fetcher.announced))
+ }
+ }
+}
+
+func TestEnqueueTransactions(t *testing.T) {
+ tester := newTxFetcherTester()
+ txs := makeTransactions(tester.sender, txAnnounceLimit)
+
+ done := make(chan struct{})
+ tester.fetcher.importTxsHook = func(transactions []*types.Transaction) {
+ if len(transactions) == txAnnounceLimit {
+ done <- struct{}{}
+ }
+ }
+ go tester.fetcher.EnqueueTxs("peer", txs)
+ select {
+ case <-done:
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+}
+
+func TestInvalidTxAnnounces(t *testing.T) {
+ tester := newTxFetcherTester()
+
+ var txs []*types.Transaction
+ txs = append(txs, makeUnsignedTransactions(tester.sender, 1)...)
+ txs = append(txs, makeTransactions(tester.sender, 1)...)
+
+ txFetcherFn := tester.makeTxFetcher("peer", txs)
+
+ dropped := make(chan string, 1)
+ tester.fetcher.dropHook = func(s string) { dropped <- s }
+
+ for _, tx := range txs {
+ tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), txFetcherFn)
+ }
+ select {
+ case s := <-dropped:
+ if s != "peer" {
+ t.Fatalf("invalid dropped peer")
+ }
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+}
+
+func TestRejectUnderpriced(t *testing.T) {
+ tester := newTxFetcherTester()
+ tester.priceLimit = big.NewInt(10000)
+
+ done := make(chan struct{})
+ tester.fetcher.importTxsHook = func([]*types.Transaction) { done <- struct{}{} }
+ reject := make(chan struct{})
+ tester.fetcher.rejectUnderprice = func(common.Hash) { reject <- struct{}{} }
+
+ tx := types.NewTransaction(0, common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(100)), 100, big.NewInt(int64(100)), nil)
+ tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), tester.sender)
+ txFetcherFn := tester.makeTxFetcher("peer", []*types.Transaction{tx})
+
+ // Send the announcement first time
+ tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn)
+ <-done
+
+ // Resend the announcement, shouldn't schedule fetching this time
+ tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn)
+ select {
+ case <-reject:
+ case <-time.NewTimer(time.Second).C:
+ t.Fatalf("timeout")
+ }
+}
diff --git a/eth/handler.go b/eth/handler.go
index ae2b764cf..d527b15d1 100644
--- a/eth/handler.go
+++ b/eth/handler.go
@@ -77,9 +77,10 @@ type ProtocolManager struct {
blockchain *core.BlockChain
maxPeers int
- downloader *downloader.Downloader
- fetcher *fetcher.Fetcher
- peers *peerSet
+ downloader *downloader.Downloader
+ blockFetcher *fetcher.BlockFetcher
+ txFetcher *fetcher.TxFetcher
+ peers *peerSet
eventMux *event.TypeMux
txsCh chan core.NewTxsEvent
@@ -97,6 +98,9 @@ type ProtocolManager struct {
// wait group is used for graceful shutdowns during downloading
// and processing
wg sync.WaitGroup
+
+ // Test fields or hooks
+ broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation
}
// NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable
@@ -187,7 +191,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh
}
return n, err
}
- manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
+ manager.blockFetcher = fetcher.NewBlockFetcher(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer)
+ manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, manager.removePeer)
return manager, nil
}
@@ -203,7 +208,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol {
Version: version,
Length: length,
Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error {
- peer := pm.newPeer(int(version), p, rw)
+ peer := pm.newPeer(int(version), p, rw, pm.txpool.Get)
select {
case pm.newPeerCh <- peer:
pm.wg.Add(1)
@@ -286,8 +291,8 @@ func (pm *ProtocolManager) Stop() {
log.Info("Ethereum protocol stopped")
}
-func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
- return newPeer(pv, p, newMeteredMsgWriter(rw))
+func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer {
+ return newPeer(pv, p, newMeteredMsgWriter(rw), getPooledTx)
}
// handle is the callback invoked to manage the life cycle of an eth peer. When
@@ -514,7 +519,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want)
}
// Irrelevant of the fork checks, send the header to the fetcher just in case
- headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now())
+ headers = pm.blockFetcher.FilterHeaders(p.id, headers, time.Now())
}
if len(headers) > 0 || !filter {
err := pm.downloader.DeliverHeaders(p.id, headers)
@@ -567,7 +572,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Filter out any explicitly requested bodies, deliver the rest to the downloader
filter := len(transactions) > 0 || len(uncles) > 0
if filter {
- transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now())
+ transactions, uncles = pm.blockFetcher.FilterBodies(p.id, transactions, uncles, time.Now())
}
if len(transactions) > 0 || len(uncles) > 0 || !filter {
err := pm.downloader.DeliverBodies(p.id, transactions, uncles)
@@ -678,7 +683,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
for _, block := range unknown {
- pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
+ pm.blockFetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies)
}
case msg.Code == NewBlockMsg:
@@ -703,7 +708,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
// Mark the peer as owning the block and schedule it for import
p.MarkBlock(request.Block.Hash())
- pm.fetcher.Enqueue(p.id, request.Block)
+ pm.blockFetcher.Enqueue(p.id, request.Block)
// Assuming the block is importable by the peer, but possibly not yet done so,
// calculate the head hash and TD that the peer truly must have.
@@ -724,6 +729,66 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
}
+ case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65:
+ // New transaction announcement arrived, make sure we have
+ // a valid and fresh chain to handle them
+ if atomic.LoadUint32(&pm.acceptTxs) == 0 {
+ break
+ }
+ var hashes []common.Hash
+ if err := msg.Decode(&hashes); err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Schedule all the unknown hashes for retrieval
+ var unknown []common.Hash
+ for _, hash := range hashes {
+ // Mark the hashes as present at the remote node
+ p.MarkTransaction(hash)
+
+ // Filter duplicated transaction announcement.
+ // Notably we only dedupliate announcement in txpool, check the rationale
+ // behind in EIP https://github.com/ethereum/EIPs/pull/2464.
+ if pm.txpool.Has(hash) {
+ continue
+ }
+ unknown = append(unknown, hash)
+ }
+ pm.txFetcher.Notify(p.id, unknown, time.Now(), p.AsyncRequestTxs)
+
+ case msg.Code == GetPooledTransactionsMsg && p.version >= eth65:
+ // Decode the retrieval message
+ msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size))
+ if _, err := msgStream.List(); err != nil {
+ return err
+ }
+ // Gather transactions until the fetch or network limits is reached
+ var (
+ hash common.Hash
+ bytes int
+ txs []rlp.RawValue
+ )
+ for bytes < softResponseLimit {
+ // Retrieve the hash of the next block
+ if err := msgStream.Decode(&hash); err == rlp.EOL {
+ break
+ } else if err != nil {
+ return errResp(ErrDecode, "msg %v: %v", msg, err)
+ }
+ // Retrieve the requested transaction, skipping if unknown to us
+ tx := pm.txpool.Get(hash)
+ if tx == nil {
+ continue
+ }
+ // If known, encode and queue for response packet
+ if encoded, err := rlp.EncodeToBytes(tx); err != nil {
+ log.Error("Failed to encode transaction", "err", err)
+ } else {
+ txs = append(txs, encoded)
+ bytes += len(encoded)
+ }
+ }
+ return p.SendTransactionRLP(txs)
+
case msg.Code == TxMsg:
// Transactions arrived, make sure we have a valid and fresh chain to handle them
if atomic.LoadUint32(&pm.acceptTxs) == 0 {
@@ -741,7 +806,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error {
}
p.MarkTransaction(tx.Hash())
}
- pm.txpool.AddRemotes(txs)
+ pm.txFetcher.EnqueueTxs(p.id, txs)
default:
return errResp(ErrInvalidMsgCode, "%v", msg.Code)
@@ -791,20 +856,48 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
// BroadcastTxs will propagate a batch of transactions to all peers which are not known to
// already have the given transaction.
-func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) {
- var txset = make(map[*peer]types.Transactions)
-
+func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions, propagate bool) {
+ var (
+ txset = make(map[*peer][]common.Hash)
+ annos = make(map[*peer][]common.Hash)
+ )
// Broadcast transactions to a batch of peers not knowing about it
+ if propagate {
+ for _, tx := range txs {
+ peers := pm.peers.PeersWithoutTx(tx.Hash())
+
+ // Send the block to a subset of our peers
+ transferLen := int(math.Sqrt(float64(len(peers))))
+ if transferLen < minBroadcastPeers {
+ transferLen = minBroadcastPeers
+ }
+ if transferLen > len(peers) {
+ transferLen = len(peers)
+ }
+ transfer := peers[:transferLen]
+ for _, peer := range transfer {
+ txset[peer] = append(txset[peer], tx.Hash())
+ }
+ log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
+ }
+ for peer, hashes := range txset {
+ peer.AsyncSendTransactions(hashes)
+ }
+ return
+ }
+ // Otherwise only broadcast the announcement to peers
for _, tx := range txs {
peers := pm.peers.PeersWithoutTx(tx.Hash())
for _, peer := range peers {
- txset[peer] = append(txset[peer], tx)
+ annos[peer] = append(annos[peer], tx.Hash())
}
- log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers))
}
- // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))]
- for peer, txs := range txset {
- peer.AsyncSendTransactions(txs)
+ for peer, hashes := range annos {
+ if peer.version >= eth65 {
+ peer.AsyncSendTransactionHashes(hashes)
+ } else {
+ peer.AsyncSendTransactions(hashes)
+ }
}
}
@@ -823,7 +916,13 @@ func (pm *ProtocolManager) txBroadcastLoop() {
for {
select {
case event := <-pm.txsCh:
- pm.BroadcastTxs(event.Txs)
+ // For testing purpose only, disable propagation
+ if pm.broadcastTxAnnouncesOnly {
+ pm.BroadcastTxs(event.Txs, false)
+ continue
+ }
+ pm.BroadcastTxs(event.Txs, true) // First propagate transactions to peers
+ pm.BroadcastTxs(event.Txs, false) // Only then announce to the rest
// Err() channel will be closed when unsubscribing.
case <-pm.txsSub.Err():
diff --git a/eth/handler_test.go b/eth/handler_test.go
index 354cbc068..97613a983 100644
--- a/eth/handler_test.go
+++ b/eth/handler_test.go
@@ -495,7 +495,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
- pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil)
+ pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
@@ -582,7 +582,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
- pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil)
+ pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
diff --git a/eth/helper_test.go b/eth/helper_test.go
index e66910334..bec37e16c 100644
--- a/eth/helper_test.go
+++ b/eth/helper_test.go
@@ -68,7 +68,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func
if _, err := blockchain.InsertChain(chain); err != nil {
panic(err)
}
- pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil)
+ pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil)
if err != nil {
return nil, nil, err
}
@@ -91,22 +91,43 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i
// testTxPool is a fake, helper transaction pool for testing purposes
type testTxPool struct {
txFeed event.Feed
- pool []*types.Transaction // Collection of all transactions
- added chan<- []*types.Transaction // Notification channel for new transactions
+ pool map[common.Hash]*types.Transaction // Hash map of collected transactions
+ added chan<- []*types.Transaction // Notification channel for new transactions
lock sync.RWMutex // Protects the transaction pool
}
+// Has returns an indicator whether txpool has a transaction
+// cached with the given hash.
+func (p *testTxPool) Has(hash common.Hash) bool {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ return p.pool[hash] != nil
+}
+
+// Get retrieves the transaction from local txpool with given
+// tx hash.
+func (p *testTxPool) Get(hash common.Hash) *types.Transaction {
+ p.lock.Lock()
+ defer p.lock.Unlock()
+
+ return p.pool[hash]
+}
+
// AddRemotes appends a batch of transactions to the pool, and notifies any
// listeners if the addition channel is non nil
func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error {
p.lock.Lock()
defer p.lock.Unlock()
- p.pool = append(p.pool, txs...)
+ for _, tx := range txs {
+ p.pool[tx.Hash()] = tx
+ }
if p.added != nil {
p.added <- txs
}
+ p.txFeed.Send(core.NewTxsEvent{Txs: txs})
return make([]error, len(txs))
}
@@ -153,7 +174,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te
var id enode.ID
rand.Read(id[:])
- peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net)
+ peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get)
// Start the peer on a new thread
errc := make(chan error, 1)
@@ -191,7 +212,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesi
CurrentBlock: head,
GenesisBlock: genesis,
}
- case p.version == eth64:
+ case p.version >= eth64:
msg = &statusData{
ProtocolVersion: uint32(p.version),
NetworkID: DefaultConfig.NetworkId,
diff --git a/eth/peer.go b/eth/peer.go
index 0beec1d84..f4b939b71 100644
--- a/eth/peer.go
+++ b/eth/peer.go
@@ -27,6 +27,7 @@ import (
"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/core/forkid"
"github.com/ethereum/go-ethereum/core/types"
+ "github.com/ethereum/go-ethereum/eth/fetcher"
"github.com/ethereum/go-ethereum/p2p"
"github.com/ethereum/go-ethereum/rlp"
)
@@ -41,24 +42,39 @@ const (
maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS)
maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS)
- // maxQueuedTxs is the maximum number of transaction lists to queue up before
- // dropping broadcasts. This is a sensitive number as a transaction list might
- // contain a single transaction, or thousands.
- maxQueuedTxs = 128
+ // maxQueuedTxs is the maximum number of transactions to queue up before dropping
+ // broadcasts.
+ maxQueuedTxs = 4096
- // maxQueuedProps is the maximum number of block propagations to queue up before
+ // maxQueuedTxAnns is the maximum number of transaction announcements to queue up
+ // before dropping broadcasts.
+ maxQueuedTxAnns = 4096
+
+ // maxQueuedTxRetrieval is the maximum number of tx retrieval requests to queue up
+ // before dropping requests.
+ maxQueuedTxRetrieval = 4096
+
+ // maxQueuedBlocks is the maximum number of block propagations to queue up before
// dropping broadcasts. There's not much point in queueing stale blocks, so a few
// that might cover uncles should be enough.
- maxQueuedProps = 4
+ maxQueuedBlocks = 4
- // maxQueuedAnns is the maximum number of block announcements to queue up before
+ // maxQueuedBlockAnns is the maximum number of block announcements to queue up before
// dropping broadcasts. Similarly to block propagations, there's no point to queue
// above some healthy uncle limit, so use that.
- maxQueuedAnns = 4
+ maxQueuedBlockAnns = 4
handshakeTimeout = 5 * time.Second
)
+// max is a helper function which returns the larger of the two given integers.
+func max(a, b int) int {
+ if a > b {
+ return a
+ }
+ return b
+}
+
// PeerInfo represents a short summary of the Ethereum sub-protocol metadata known
// about a connected peer.
type PeerInfo struct {
@@ -86,48 +102,48 @@ type peer struct {
td *big.Int
lock sync.RWMutex
- knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
- knownBlocks mapset.Set // Set of block hashes known to be known by this peer
- queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer
- queuedProps chan *propEvent // Queue of blocks to broadcast to the peer
- queuedAnns chan *types.Block // Queue of blocks to announce to the peer
- term chan struct{} // Termination channel to stop the broadcaster
+ knownTxs mapset.Set // Set of transaction hashes known to be known by this peer
+ knownBlocks mapset.Set // Set of block hashes known to be known by this peer
+ queuedBlocks chan *propEvent // Queue of blocks to broadcast to the peer
+ queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer
+ txPropagation chan []common.Hash // Channel used to queue transaction propagation requests
+ txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests
+ txRetrieval chan []common.Hash // Channel used to queue transaction retrieval requests
+ getPooledTx func(common.Hash) *types.Transaction // Callback used to retrieve transaction from txpool
+ term chan struct{} // Termination channel to stop the broadcaster
}
-func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer {
+func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer {
return &peer{
- Peer: p,
- rw: rw,
- version: version,
- id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
- knownTxs: mapset.NewSet(),
- knownBlocks: mapset.NewSet(),
- queuedTxs: make(chan []*types.Transaction, maxQueuedTxs),
- queuedProps: make(chan *propEvent, maxQueuedProps),
- queuedAnns: make(chan *types.Block, maxQueuedAnns),
- term: make(chan struct{}),
+ Peer: p,
+ rw: rw,
+ version: version,
+ id: fmt.Sprintf("%x", p.ID().Bytes()[:8]),
+ knownTxs: mapset.NewSet(),
+ knownBlocks: mapset.NewSet(),
+ queuedBlocks: make(chan *propEvent, maxQueuedBlocks),
+ queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns),
+ txPropagation: make(chan []common.Hash),
+ txAnnounce: make(chan []common.Hash),
+ txRetrieval: make(chan []common.Hash),
+ getPooledTx: getPooledTx,
+ term: make(chan struct{}),
}
}
-// broadcast is a write loop that multiplexes block propagations, announcements
-// and transaction broadcasts into the remote peer. The goal is to have an async
-// writer that does not lock up node internals.
-func (p *peer) broadcast() {
+// broadcastBlocks is a write loop that multiplexes block propagations,
+// announcements into the remote peer. The goal is to have an async writer
+// that does not lock up node internals.
+func (p *peer) broadcastBlocks() {
for {
select {
- case txs := <-p.queuedTxs:
- if err := p.SendTransactions(txs); err != nil {
- return
- }
- p.Log().Trace("Broadcast transactions", "count", len(txs))
-
- case prop := <-p.queuedProps:
+ case prop := <-p.queuedBlocks:
if err := p.SendNewBlock(prop.block, prop.td); err != nil {
return
}
p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td)
- case block := <-p.queuedAnns:
+ case block := <-p.queuedBlockAnns:
if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil {
return
}
@@ -139,6 +155,175 @@ func (p *peer) broadcast() {
}
}
+// broadcastTxs is a write loop that multiplexes transaction propagations,
+// announcements into the remote peer. The goal is to have an async writer
+// that does not lock up node internals.
+func (p *peer) broadcastTxs() {
+ var (
+ txProps []common.Hash // Queue of transaction propagations to the peer
+ txAnnos []common.Hash // Queue of transaction announcements to the peer
+ done chan struct{} // Non-nil if background network sender routine is active.
+ errch = make(chan error) // Channel used to receive network error
+ )
+ scheduleTask := func() {
+ // Short circuit if there already has a inflight task.
+ if done != nil {
+ return
+ }
+ // Spin up transaction propagation task if there is any
+ // queued hashes.
+ if len(txProps) > 0 {
+ var (
+ hashes []common.Hash
+ txs []*types.Transaction
+ size common.StorageSize
+ )
+ for i := 0; i < len(txProps) && size < txsyncPackSize; i++ {
+ if tx := p.getPooledTx(txProps[i]); tx != nil {
+ txs = append(txs, tx)
+ size += tx.Size()
+ }
+ hashes = append(hashes, txProps[i])
+ }
+ txProps = txProps[:copy(txProps, txProps[len(hashes):])]
+ if len(txs) > 0 {
+ done = make(chan struct{})
+ go func() {
+ if err := p.SendNewTransactions(txs); err != nil {
+ errch <- err
+ return
+ }
+ close(done)
+ p.Log().Trace("Sent transactions", "count", len(txs))
+ }()
+ return
+ }
+ }
+ // Spin up transaction announcement task if there is any
+ // queued hashes.
+ if len(txAnnos) > 0 {
+ var (
+ hashes []common.Hash
+ pending []common.Hash
+ size common.StorageSize
+ )
+ for i := 0; i < len(txAnnos) && size < txsyncPackSize; i++ {
+ if tx := p.getPooledTx(txAnnos[i]); tx != nil {
+ pending = append(pending, txAnnos[i])
+ size += common.HashLength
+ }
+ hashes = append(hashes, txAnnos[i])
+ }
+ txAnnos = txAnnos[:copy(txAnnos, txAnnos[len(hashes):])]
+ if len(pending) > 0 {
+ done = make(chan struct{})
+ go func() {
+ if err := p.SendNewTransactionHashes(pending); err != nil {
+ errch <- err
+ return
+ }
+ close(done)
+ p.Log().Trace("Sent transaction announcements", "count", len(pending))
+ }()
+ }
+ }
+ }
+
+ for {
+ scheduleTask()
+ select {
+ case hashes := <-p.txPropagation:
+ if len(txProps) == maxQueuedTxs {
+ continue
+ }
+ if len(txProps)+len(hashes) > maxQueuedTxs {
+ hashes = hashes[:maxQueuedTxs-len(txProps)]
+ }
+ txProps = append(txProps, hashes...)
+
+ case hashes := <-p.txAnnounce:
+ if len(txAnnos) == maxQueuedTxAnns {
+ continue
+ }
+ if len(txAnnos)+len(hashes) > maxQueuedTxAnns {
+ hashes = hashes[:maxQueuedTxAnns-len(txAnnos)]
+ }
+ txAnnos = append(txAnnos, hashes...)
+
+ case <-done:
+ done = nil
+
+ case <-errch:
+ return
+
+ case <-p.term:
+ return
+ }
+ }
+}
+
+// retrievalTxs is a write loop which is responsible for retrieving transaction
+// from the remote peer. The goal is to have an async writer that does not lock
+// up node internals. If there are too many requests queued, then new arrival
+// requests will be dropped silently so that we can ensure the memory assumption
+// is fixed for each peer.
+func (p *peer) retrievalTxs() {
+ var (
+ requests []common.Hash // Queue of transaction requests to the peer
+ done chan struct{} // Non-nil if background network sender routine is active.
+ errch = make(chan error) // Channel used to receive network error
+ )
+ // pick chooses a reasonble number of transaction hashes for retrieval.
+ pick := func() []common.Hash {
+ var ret []common.Hash
+ if len(requests) > fetcher.MaxTransactionFetch {
+ ret = requests[:fetcher.MaxTransactionFetch]
+ } else {
+ ret = requests[:]
+ }
+ requests = requests[:copy(requests, requests[len(ret):])]
+ return ret
+ }
+ // send sends transactions retrieval request.
+ send := func(hashes []common.Hash, done chan struct{}) {
+ if err := p.RequestTxs(hashes); err != nil {
+ errch <- err
+ return
+ }
+ close(done)
+ p.Log().Trace("Sent transaction retrieval request", "count", len(hashes))
+ }
+ for {
+ select {
+ case hashes := <-p.txRetrieval:
+ if len(requests) == maxQueuedTxRetrieval {
+ continue
+ }
+ if len(requests)+len(hashes) > maxQueuedTxRetrieval {
+ hashes = hashes[:maxQueuedTxRetrieval-len(requests)]
+ }
+ requests = append(requests, hashes...)
+ if done == nil {
+ done = make(chan struct{})
+ go send(pick(), done)
+ }
+
+ case <-done:
+ done = nil
+ if pending := pick(); len(pending) > 0 {
+ done = make(chan struct{})
+ go send(pending, done)
+ }
+
+ case <- errch:
+ return
+
+ case <-p.term:
+ return
+ }
+ }
+}
+
// close signals the broadcast goroutine to terminate.
func (p *peer) close() {
close(p.term)
@@ -194,33 +379,67 @@ func (p *peer) MarkTransaction(hash common.Hash) {
p.knownTxs.Add(hash)
}
-// SendTransactions sends transactions to the peer and includes the hashes
-// in its transaction hash set for future reference.
-func (p *peer) SendTransactions(txs types.Transactions) error {
+// SendNewTransactionHashes sends a batch of transaction hashes to the peer and
+// includes the hashes in its transaction hash set for future reference.
+func (p *peer) SendNewTransactionHashes(hashes []common.Hash) error {
// Mark all the transactions as known, but ensure we don't overflow our limits
+ for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
+ p.knownTxs.Pop()
+ }
+ for _, hash := range hashes {
+ p.knownTxs.Add(hash)
+ }
+ return p2p.Send(p.rw, NewPooledTransactionHashesMsg, hashes)
+}
+
+// SendNewTransactions sends transactions to the peer and includes the hashes
+// in its transaction hash set for future reference.
+func (p *peer) SendNewTransactions(txs types.Transactions) error {
+ // Mark all the transactions as known, but ensure we don't overflow our limits
+ for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) {
+ p.knownTxs.Pop()
+ }
for _, tx := range txs {
p.knownTxs.Add(tx.Hash())
}
- for p.knownTxs.Cardinality() >= maxKnownTxs {
- p.knownTxs.Pop()
- }
+ return p2p.Send(p.rw, TxMsg, txs)
+}
+
+func (p *peer) SendTransactionRLP(txs []rlp.RawValue) error {
return p2p.Send(p.rw, TxMsg, txs)
}
// AsyncSendTransactions queues list of transactions propagation to a remote
// peer. If the peer's broadcast queue is full, the event is silently dropped.
-func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
+func (p *peer) AsyncSendTransactions(hashes []common.Hash) {
select {
- case p.queuedTxs <- txs:
+ case p.txPropagation <- hashes:
// Mark all the transactions as known, but ensure we don't overflow our limits
- for _, tx := range txs {
- p.knownTxs.Add(tx.Hash())
- }
- for p.knownTxs.Cardinality() >= maxKnownTxs {
+ for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
p.knownTxs.Pop()
}
- default:
- p.Log().Debug("Dropping transaction propagation", "count", len(txs))
+ for _, hash := range hashes {
+ p.knownTxs.Add(hash)
+ }
+ case <-p.term:
+ p.Log().Debug("Dropping transaction propagation", "count", len(hashes))
+ }
+}
+
+// AsyncSendTransactions queues list of transactions propagation to a remote
+// peer. If the peer's broadcast queue is full, the event is silently dropped.
+func (p *peer) AsyncSendTransactionHashes(hashes []common.Hash) {
+ select {
+ case p.txAnnounce <- hashes:
+ // Mark all the transactions as known, but ensure we don't overflow our limits
+ for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) {
+ p.knownTxs.Pop()
+ }
+ for _, hash := range hashes {
+ p.knownTxs.Add(hash)
+ }
+ case <-p.term:
+ p.Log().Debug("Dropping transaction announcement", "count", len(hashes))
}
}
@@ -228,12 +447,12 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) {
// a hash notification.
func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error {
// Mark all the block hashes as known, but ensure we don't overflow our limits
+ for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) {
+ p.knownBlocks.Pop()
+ }
for _, hash := range hashes {
p.knownBlocks.Add(hash)
}
- for p.knownBlocks.Cardinality() >= maxKnownBlocks {
- p.knownBlocks.Pop()
- }
request := make(newBlockHashesData, len(hashes))
for i := 0; i < len(hashes); i++ {
request[i].Hash = hashes[i]
@@ -247,12 +466,12 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error
// dropped.
func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
select {
- case p.queuedAnns <- block:
+ case p.queuedBlockAnns <- block:
// Mark all the block hash as known, but ensure we don't overflow our limits
- p.knownBlocks.Add(block.Hash())
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
+ p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash())
}
@@ -261,10 +480,10 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) {
// SendNewBlock propagates an entire block to a remote peer.
func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
// Mark all the block hash as known, but ensure we don't overflow our limits
- p.knownBlocks.Add(block.Hash())
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
+ p.knownBlocks.Add(block.Hash())
return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td})
}
@@ -272,12 +491,12 @@ func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error {
// the peer's broadcast queue is full, the event is silently dropped.
func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) {
select {
- case p.queuedProps <- &propEvent{block: block, td: td}:
+ case p.queuedBlocks <- &propEvent{block: block, td: td}:
// Mark all the block hash as known, but ensure we don't overflow our limits
- p.knownBlocks.Add(block.Hash())
for p.knownBlocks.Cardinality() >= maxKnownBlocks {
p.knownBlocks.Pop()
}
+ p.knownBlocks.Add(block.Hash())
default:
p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash())
}
@@ -352,6 +571,22 @@ func (p *peer) RequestReceipts(hashes []common.Hash) error {
return p2p.Send(p.rw, GetReceiptsMsg, hashes)
}
+// RequestTxs fetches a batch of transactions from a remote node.
+func (p *peer) RequestTxs(hashes []common.Hash) error {
+ p.Log().Debug("Fetching batch of transactions", "count", len(hashes))
+ return p2p.Send(p.rw, GetPooledTransactionsMsg, hashes)
+}
+
+// AsyncRequestTxs queues a tx retrieval request to a remote peer. If
+// the peer's retrieval queue is full, the event is silently dropped.
+func (p *peer) AsyncRequestTxs(hashes []common.Hash) {
+ select {
+ case p.txRetrieval <- hashes:
+ case <-p.term:
+ p.Log().Debug("Dropping transaction retrieval request", "count", len(hashes))
+ }
+}
+
// Handshake executes the eth protocol handshake, negotiating version number,
// network IDs, difficulties, head and genesis blocks.
func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error {
@@ -372,7 +607,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
CurrentBlock: head,
GenesisBlock: genesis,
})
- case p.version == eth64:
+ case p.version >= eth64:
errc <- p2p.Send(p.rw, StatusMsg, &statusData{
ProtocolVersion: uint32(p.version),
NetworkID: network,
@@ -389,7 +624,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
switch {
case p.version == eth63:
errc <- p.readStatusLegacy(network, &status63, genesis)
- case p.version == eth64:
+ case p.version >= eth64:
errc <- p.readStatus(network, &status, genesis, forkFilter)
default:
panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
@@ -410,7 +645,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis
switch {
case p.version == eth63:
p.td, p.head = status63.TD, status63.CurrentBlock
- case p.version == eth64:
+ case p.version >= eth64:
p.td, p.head = status.TD, status.Head
default:
panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version))
@@ -511,7 +746,9 @@ func (ps *peerSet) Register(p *peer) error {
return errAlreadyRegistered
}
ps.peers[p.id] = p
- go p.broadcast()
+ go p.broadcastBlocks()
+ go p.broadcastTxs()
+ go p.retrievalTxs()
return nil
}
diff --git a/eth/protocol.go b/eth/protocol.go
index 62e4d13d1..1cef66adb 100644
--- a/eth/protocol.go
+++ b/eth/protocol.go
@@ -33,16 +33,17 @@ import (
const (
eth63 = 63
eth64 = 64
+ eth65 = 65
)
// protocolName is the official short name of the protocol used during capability negotiation.
const protocolName = "eth"
// ProtocolVersions are the supported versions of the eth protocol (first is primary).
-var ProtocolVersions = []uint{eth64, eth63}
+var ProtocolVersions = []uint{eth65, eth64, eth63}
// protocolLengths are the number of implemented message corresponding to different protocol versions.
-var protocolLengths = map[uint]uint64{eth64: 17, eth63: 17}
+var protocolLengths = map[uint]uint64{eth65: 17, eth64: 17, eth63: 17}
const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message
@@ -60,6 +61,13 @@ const (
NodeDataMsg = 0x0e
GetReceiptsMsg = 0x0f
ReceiptsMsg = 0x10
+
+ // New protocol message codes introduced in eth65
+ //
+ // Previously these message ids(0x08, 0x09) were used by some
+ // legacy and unsupported eth protocols, reown them here.
+ NewPooledTransactionHashesMsg = 0x08
+ GetPooledTransactionsMsg = 0x09
)
type errCode int
@@ -94,6 +102,14 @@ var errorToString = map[int]string{
}
type txPool interface {
+ // Has returns an indicator whether txpool has a transaction
+ // cached with the given hash.
+ Has(hash common.Hash) bool
+
+ // Get retrieves the transaction from local txpool with given
+ // tx hash.
+ Get(hash common.Hash) *types.Transaction
+
// AddRemotes should add the given transactions to the pool.
AddRemotes([]*types.Transaction) []error
diff --git a/eth/protocol_test.go b/eth/protocol_test.go
index ca418942b..e9a1a511e 100644
--- a/eth/protocol_test.go
+++ b/eth/protocol_test.go
@@ -20,6 +20,7 @@ import (
"fmt"
"math/big"
"sync"
+ "sync/atomic"
"testing"
"time"
@@ -180,16 +181,16 @@ func TestForkIDSplit(t *testing.T) {
blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil)
blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil)
- ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainNoFork, dbNoFork, 1, nil)
- ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainProFork, dbProFork, 1, nil)
+ ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil)
+ ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil)
)
ethNoFork.Start(1000)
ethProFork.Start(1000)
// Both nodes should allow the other to connect (same genesis, next fork is the same)
p2pNoFork, p2pProFork := p2p.MsgPipe()
- peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
- peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
+ peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
+ peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc := make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
@@ -207,8 +208,8 @@ func TestForkIDSplit(t *testing.T) {
chainProFork.InsertChain(blocksProFork[:1])
p2pNoFork, p2pProFork = p2p.MsgPipe()
- peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
- peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
+ peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
+ peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc = make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
@@ -226,8 +227,8 @@ func TestForkIDSplit(t *testing.T) {
chainProFork.InsertChain(blocksProFork[1:2])
p2pNoFork, p2pProFork = p2p.MsgPipe()
- peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork)
- peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork)
+ peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil)
+ peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil)
errc = make(chan error, 2)
go func() { errc <- ethNoFork.handle(peerProFork) }()
@@ -246,6 +247,7 @@ func TestForkIDSplit(t *testing.T) {
// This test checks that received transactions are added to the local pool.
func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) }
func TestRecvTransactions64(t *testing.T) { testRecvTransactions(t, 64) }
+func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, 65) }
func testRecvTransactions(t *testing.T, protocol int) {
txAdded := make(chan []*types.Transaction)
@@ -274,6 +276,7 @@ func testRecvTransactions(t *testing.T, protocol int) {
// This test checks that pending transactions are sent.
func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) }
func TestSendTransactions64(t *testing.T) { testSendTransactions(t, 64) }
+func TestSendTransactions65(t *testing.T) { testSendTransactions(t, 65) }
func testSendTransactions(t *testing.T, protocol int) {
pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil)
@@ -298,17 +301,43 @@ func testSendTransactions(t *testing.T, protocol int) {
}
for n := 0; n < len(alltxs) && !t.Failed(); {
var txs []*types.Transaction
- msg, err := p.app.ReadMsg()
- if err != nil {
- t.Errorf("%v: read error: %v", p.Peer, err)
- } else if msg.Code != TxMsg {
- t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
+ var hashes []common.Hash
+ var forAllHashes func(callback func(hash common.Hash))
+ switch protocol {
+ case 63:
+ fallthrough
+ case 64:
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != TxMsg {
+ t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&txs); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ forAllHashes = func(callback func(hash common.Hash)) {
+ for _, tx := range txs {
+ callback(tx.Hash())
+ }
+ }
+ case 65:
+ msg, err := p.app.ReadMsg()
+ if err != nil {
+ t.Errorf("%v: read error: %v", p.Peer, err)
+ } else if msg.Code != NewPooledTransactionHashesMsg {
+ t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code)
+ }
+ if err := msg.Decode(&hashes); err != nil {
+ t.Errorf("%v: %v", p.Peer, err)
+ }
+ forAllHashes = func(callback func(hash common.Hash)) {
+ for _, h := range hashes {
+ callback(h)
+ }
+ }
}
- if err := msg.Decode(&txs); err != nil {
- t.Errorf("%v: %v", p.Peer, err)
- }
- for _, tx := range txs {
- hash := tx.Hash()
+ forAllHashes(func(hash common.Hash) {
seentx, want := seen[hash]
if seentx {
t.Errorf("%v: got tx more than once: %x", p.Peer, hash)
@@ -318,7 +347,7 @@ func testSendTransactions(t *testing.T, protocol int) {
}
seen[hash] = true
n++
- }
+ })
}
}
for i := 0; i < 3; i++ {
@@ -329,6 +358,53 @@ func testSendTransactions(t *testing.T, protocol int) {
wg.Wait()
}
+func TestTransactionPropagation(t *testing.T) { testSyncTransaction(t, true) }
+func TestTransactionAnnouncement(t *testing.T) { testSyncTransaction(t, false) }
+
+func testSyncTransaction(t *testing.T, propagtion bool) {
+ // Create a protocol manager for transaction fetcher and sender
+ pmFetcher, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil)
+ defer pmFetcher.Stop()
+ pmSender, _ := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil)
+ pmSender.broadcastTxAnnouncesOnly = !propagtion
+ defer pmSender.Stop()
+
+ // Sync up the two peers
+ io1, io2 := p2p.MsgPipe()
+
+ go pmSender.handle(pmSender.newPeer(65, p2p.NewPeer(enode.ID{}, "sender", nil), io2, pmSender.txpool.Get))
+ go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get))
+
+ time.Sleep(250 * time.Millisecond)
+ pmFetcher.synchronise(pmFetcher.peers.BestPeer())
+ atomic.StoreUint32(&pmFetcher.acceptTxs, 1)
+
+ newTxs := make(chan core.NewTxsEvent, 1024)
+ sub := pmFetcher.txpool.SubscribeNewTxsEvent(newTxs)
+ defer sub.Unsubscribe()
+
+ // Fill the pool with new transactions
+ alltxs := make([]*types.Transaction, 1024)
+ for nonce := range alltxs {
+ alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), 0)
+ }
+ pmSender.txpool.AddRemotes(alltxs)
+
+ var got int
+loop:
+ for {
+ select {
+ case ev := <-newTxs:
+ got += len(ev.Txs)
+ if got == 1024 {
+ break loop
+ }
+ case <-time.NewTimer(time.Second).C:
+ t.Fatal("Failed to retrieve all transaction")
+ }
+ }
+}
+
// Tests that the custom union field encoder and decoder works correctly.
func TestGetBlockHeadersDataEncodeDecode(t *testing.T) {
// Create a "random" hash for testing
diff --git a/eth/sync.go b/eth/sync.go
index 9e180ee20..93b2dd2ec 100644
--- a/eth/sync.go
+++ b/eth/sync.go
@@ -38,8 +38,9 @@ const (
)
type txsync struct {
- p *peer
- txs []*types.Transaction
+ p *peer
+ hashes []common.Hash
+ txs []*types.Transaction
}
// syncTransactions starts sending all currently pending transactions to the given peer.
@@ -53,7 +54,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) {
return
}
select {
- case pm.txsyncCh <- &txsync{p, txs}:
+ case pm.txsyncCh <- &txsync{p: p, txs: txs}:
case <-pm.quitSync:
}
}
@@ -69,26 +70,46 @@ func (pm *ProtocolManager) txsyncLoop() {
pack = new(txsync) // the pack that is being sent
done = make(chan error, 1) // result of the send
)
-
// send starts a sending a pack of transactions from the sync.
send := func(s *txsync) {
// Fill pack with transactions up to the target size.
size := common.StorageSize(0)
pack.p = s.p
+ pack.hashes = pack.hashes[:0]
pack.txs = pack.txs[:0]
- for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
- pack.txs = append(pack.txs, s.txs[i])
- size += s.txs[i].Size()
+ if s.p.version >= eth65 {
+ // Eth65 introduces transaction announcement https://github.com/ethereum/EIPs/pull/2464,
+ // only txhashes are transferred here.
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.hashes = append(pack.hashes, s.txs[i].Hash())
+ size += common.HashLength
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.hashes):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transaction announcements", "count", len(pack.hashes), "bytes", size)
+ sending = true
+ go func() { done <- pack.p.SendNewTransactionHashes(pack.hashes) }()
+ } else {
+ // Legacy eth protocol doesn't have transaction announcement protocol
+ // message, transfer the whole pending transaction slice.
+ for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ {
+ pack.txs = append(pack.txs, s.txs[i])
+ size += s.txs[i].Size()
+ }
+ // Remove the transactions that will be sent.
+ s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
+ if len(s.txs) == 0 {
+ delete(pending, s.p.ID())
+ }
+ // Send the pack in the background.
+ s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
+ sending = true
+ go func() { done <- pack.p.SendNewTransactions(pack.txs) }()
}
- // Remove the transactions that will be sent.
- s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])]
- if len(s.txs) == 0 {
- delete(pending, s.p.ID())
- }
- // Send the pack in the background.
- s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size)
- sending = true
- go func() { done <- pack.p.SendTransactions(pack.txs) }()
}
// pick chooses the next pending sync.
@@ -133,8 +154,10 @@ func (pm *ProtocolManager) txsyncLoop() {
// downloading hashes and blocks as well as handling the announcement handler.
func (pm *ProtocolManager) syncer() {
// Start and ensure cleanup of sync mechanisms
- pm.fetcher.Start()
- defer pm.fetcher.Stop()
+ pm.blockFetcher.Start()
+ pm.txFetcher.Start()
+ defer pm.blockFetcher.Stop()
+ defer pm.txFetcher.Stop()
defer pm.downloader.Terminate()
// Wait for different events to fire synchronisation operations
diff --git a/eth/sync_test.go b/eth/sync_test.go
index e4c99ff58..d02bc5710 100644
--- a/eth/sync_test.go
+++ b/eth/sync_test.go
@@ -26,9 +26,13 @@ import (
"github.com/ethereum/go-ethereum/p2p/enode"
)
+func TestFastSyncDisabling63(t *testing.T) { testFastSyncDisabling(t, 63) }
+func TestFastSyncDisabling64(t *testing.T) { testFastSyncDisabling(t, 64) }
+func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, 65) }
+
// Tests that fast sync gets disabled as soon as a real block is successfully
// imported into the blockchain.
-func TestFastSyncDisabling(t *testing.T) {
+func testFastSyncDisabling(t *testing.T, protocol int) {
// Create a pristine protocol manager, check that fast sync is left enabled
pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil)
if atomic.LoadUint32(&pmEmpty.fastSync) == 0 {
@@ -42,8 +46,8 @@ func TestFastSyncDisabling(t *testing.T) {
// Sync up the two peers
io1, io2 := p2p.MsgPipe()
- go pmFull.handle(pmFull.newPeer(63, p2p.NewPeer(enode.ID{}, "empty", nil), io2))
- go pmEmpty.handle(pmEmpty.newPeer(63, p2p.NewPeer(enode.ID{}, "full", nil), io1))
+ go pmFull.handle(pmFull.newPeer(protocol, p2p.NewPeer(enode.ID{}, "empty", nil), io2, pmFull.txpool.Get))
+ go pmEmpty.handle(pmEmpty.newPeer(protocol, p2p.NewPeer(enode.ID{}, "full", nil), io1, pmEmpty.txpool.Get))
time.Sleep(250 * time.Millisecond)
pmEmpty.synchronise(pmEmpty.peers.BestPeer())