From 049e17116e5e00a052384905a433fed5245ea5c4 Mon Sep 17 00:00:00 2001 From: rjl493456442 Date: Mon, 28 Oct 2019 19:59:07 +0800 Subject: [PATCH] core, eth: implement eth/65 transaction fetcher --- core/tx_pool.go | 6 + eth/downloader/peer.go | 8 +- eth/fetcher/{fetcher.go => block_fetcher.go} | 129 ++++--- ...{fetcher_test.go => block_fetcher_test.go} | 4 +- eth/fetcher/metrics.go | 27 +- eth/fetcher/tx_fetcher.go | 319 +++++++++++++++ eth/fetcher/tx_fetcher_test.go | 318 +++++++++++++++ eth/handler.go | 141 ++++++- eth/handler_test.go | 4 +- eth/helper_test.go | 33 +- eth/peer.go | 363 +++++++++++++++--- eth/protocol.go | 20 +- eth/protocol_test.go | 114 +++++- eth/sync.go | 59 ++- eth/sync_test.go | 10 +- 15 files changed, 1344 insertions(+), 211 deletions(-) rename eth/fetcher/{fetcher.go => block_fetcher.go} (84%) rename eth/fetcher/{fetcher_test.go => block_fetcher_test.go} (99%) create mode 100644 eth/fetcher/tx_fetcher.go create mode 100644 eth/fetcher/tx_fetcher_test.go diff --git a/core/tx_pool.go b/core/tx_pool.go index ae6962c5d..16d80d644 100644 --- a/core/tx_pool.go +++ b/core/tx_pool.go @@ -864,6 +864,12 @@ func (pool *TxPool) Get(hash common.Hash) *types.Transaction { return pool.all.Get(hash) } +// Has returns an indicator whether txpool has a transaction cached with the +// given hash. +func (pool *TxPool) Has(hash common.Hash) bool { + return pool.all.Get(hash) != nil +} + // removeTx removes a single transaction from the queue, moving all subsequent // transactions back to the future queue. func (pool *TxPool) removeTx(hash common.Hash, outofbound bool) { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 60f86d0e1..5c2020d7d 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -470,7 +470,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.headerThroughput } - return ps.idlePeers(62, 64, idle, throughput) + return ps.idlePeers(62, 65, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -484,7 +484,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.blockThroughput } - return ps.idlePeers(62, 64, idle, throughput) + return ps.idlePeers(62, 65, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -498,7 +498,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.receiptThroughput } - return ps.idlePeers(63, 64, idle, throughput) + return ps.idlePeers(63, 65, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -512,7 +512,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { defer p.lock.RUnlock() return p.stateThroughput } - return ps.idlePeers(63, 64, idle, throughput) + return ps.idlePeers(63, 65, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the diff --git a/eth/fetcher/fetcher.go b/eth/fetcher/block_fetcher.go similarity index 84% rename from eth/fetcher/fetcher.go rename to eth/fetcher/block_fetcher.go index 28c532d9b..7395ec83d 100644 --- a/eth/fetcher/fetcher.go +++ b/eth/fetcher/block_fetcher.go @@ -14,7 +14,7 @@ // You should have received a copy of the GNU Lesser General Public License // along with the go-ethereum library. If not, see . -// Package fetcher contains the block announcement based synchronisation. +// Package fetcher contains the announcement based blocks or transaction synchronisation. package fetcher import ( @@ -30,13 +30,16 @@ import ( ) const ( - arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block is explicitly requested + arriveTimeout = 500 * time.Millisecond // Time allowance before an announced block/transaction is explicitly requested gatherSlack = 100 * time.Millisecond // Interval used to collate almost-expired announces with fetches - fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block - maxUncleDist = 7 // Maximum allowed backward distance from the chain head - maxQueueDist = 32 // Maximum allowed distance from the chain head to queue - hashLimit = 256 // Maximum number of unique blocks a peer may have announced - blockLimit = 64 // Maximum number of unique blocks a peer may have delivered + fetchTimeout = 5 * time.Second // Maximum allotted time to return an explicitly requested block/transaction +) + +const ( + maxUncleDist = 7 // Maximum allowed backward distance from the chain head + maxQueueDist = 32 // Maximum allowed distance from the chain head to queue + hashLimit = 256 // Maximum number of unique blocks a peer may have announced + blockLimit = 64 // Maximum number of unique blocks a peer may have delivered ) var ( @@ -67,9 +70,9 @@ type chainInsertFn func(types.Blocks) (int, error) // peerDropFn is a callback type for dropping a peer detected as malicious. type peerDropFn func(id string) -// announce is the hash notification of the availability of a new block in the +// blockAnnounce is the hash notification of the availability of a new block in the // network. -type announce struct { +type blockAnnounce struct { hash common.Hash // Hash of the block being announced number uint64 // Number of the block being announced (0 = unknown | old protocol) header *types.Header // Header of the block partially reassembled (new protocol) @@ -97,18 +100,18 @@ type bodyFilterTask struct { time time.Time // Arrival time of the blocks' contents } -// inject represents a schedules import operation. -type inject struct { +// blockInject represents a schedules import operation. +type blockInject struct { origin string block *types.Block } -// Fetcher is responsible for accumulating block announcements from various peers +// BlockFetcher is responsible for accumulating block announcements from various peers // and scheduling them for retrieval. -type Fetcher struct { +type BlockFetcher struct { // Various event channels - notify chan *announce - inject chan *inject + notify chan *blockAnnounce + inject chan *blockInject headerFilter chan chan *headerFilterTask bodyFilter chan chan *bodyFilterTask @@ -117,16 +120,16 @@ type Fetcher struct { quit chan struct{} // Announce states - announces map[string]int // Per peer announce counts to prevent memory exhaustion - announced map[common.Hash][]*announce // Announced blocks, scheduled for fetching - fetching map[common.Hash]*announce // Announced blocks, currently fetching - fetched map[common.Hash][]*announce // Blocks with headers fetched, scheduled for body retrieval - completing map[common.Hash]*announce // Blocks with headers, currently body-completing + announces map[string]int // Per peer blockAnnounce counts to prevent memory exhaustion + announced map[common.Hash][]*blockAnnounce // Announced blocks, scheduled for fetching + fetching map[common.Hash]*blockAnnounce // Announced blocks, currently fetching + fetched map[common.Hash][]*blockAnnounce // Blocks with headers fetched, scheduled for body retrieval + completing map[common.Hash]*blockAnnounce // Blocks with headers, currently body-completing // Block cache - queue *prque.Prque // Queue containing the import operations (block number sorted) - queues map[string]int // Per peer block counts to prevent memory exhaustion - queued map[common.Hash]*inject // Set of already queued blocks (to dedupe imports) + queue *prque.Prque // Queue containing the import operations (block number sorted) + queues map[string]int // Per peer block counts to prevent memory exhaustion + queued map[common.Hash]*blockInject // Set of already queued blocks (to dedupe imports) // Callbacks getBlock blockRetrievalFn // Retrieves a block from the local chain @@ -137,30 +140,30 @@ type Fetcher struct { dropPeer peerDropFn // Drops a peer for misbehaving // Testing hooks - announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the announce list + announceChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a hash from the blockAnnounce list queueChangeHook func(common.Hash, bool) // Method to call upon adding or deleting a block from the import queue fetchingHook func([]common.Hash) // Method to call upon starting a block (eth/61) or header (eth/62) fetch completingHook func([]common.Hash) // Method to call upon starting a block body fetch (eth/62) importedHook func(*types.Block) // Method to call upon successful block import (both eth/61 and eth/62) } -// New creates a block fetcher to retrieve blocks based on hash announcements. -func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *Fetcher { - return &Fetcher{ - notify: make(chan *announce), - inject: make(chan *inject), +// NewBlockFetcher creates a block fetcher to retrieve blocks based on hash announcements. +func NewBlockFetcher(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBlock blockBroadcasterFn, chainHeight chainHeightFn, insertChain chainInsertFn, dropPeer peerDropFn) *BlockFetcher { + return &BlockFetcher{ + notify: make(chan *blockAnnounce), + inject: make(chan *blockInject), headerFilter: make(chan chan *headerFilterTask), bodyFilter: make(chan chan *bodyFilterTask), done: make(chan common.Hash), quit: make(chan struct{}), announces: make(map[string]int), - announced: make(map[common.Hash][]*announce), - fetching: make(map[common.Hash]*announce), - fetched: make(map[common.Hash][]*announce), - completing: make(map[common.Hash]*announce), + announced: make(map[common.Hash][]*blockAnnounce), + fetching: make(map[common.Hash]*blockAnnounce), + fetched: make(map[common.Hash][]*blockAnnounce), + completing: make(map[common.Hash]*blockAnnounce), queue: prque.New(nil), queues: make(map[string]int), - queued: make(map[common.Hash]*inject), + queued: make(map[common.Hash]*blockInject), getBlock: getBlock, verifyHeader: verifyHeader, broadcastBlock: broadcastBlock, @@ -172,21 +175,21 @@ func New(getBlock blockRetrievalFn, verifyHeader headerVerifierFn, broadcastBloc // Start boots up the announcement based synchroniser, accepting and processing // hash notifications and block fetches until termination requested. -func (f *Fetcher) Start() { +func (f *BlockFetcher) Start() { go f.loop() } // Stop terminates the announcement based synchroniser, canceling all pending // operations. -func (f *Fetcher) Stop() { +func (f *BlockFetcher) Stop() { close(f.quit) } // Notify announces the fetcher of the potential availability of a new block in // the network. -func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, +func (f *BlockFetcher) Notify(peer string, hash common.Hash, number uint64, time time.Time, headerFetcher headerRequesterFn, bodyFetcher bodyRequesterFn) error { - block := &announce{ + block := &blockAnnounce{ hash: hash, number: number, time: time, @@ -203,8 +206,8 @@ func (f *Fetcher) Notify(peer string, hash common.Hash, number uint64, time time } // Enqueue tries to fill gaps the fetcher's future import queue. -func (f *Fetcher) Enqueue(peer string, block *types.Block) error { - op := &inject{ +func (f *BlockFetcher) Enqueue(peer string, block *types.Block) error { + op := &blockInject{ origin: peer, block: block, } @@ -218,7 +221,7 @@ func (f *Fetcher) Enqueue(peer string, block *types.Block) error { // FilterHeaders extracts all the headers that were explicitly requested by the fetcher, // returning those that should be handled differently. -func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { +func (f *BlockFetcher) FilterHeaders(peer string, headers []*types.Header, time time.Time) []*types.Header { log.Trace("Filtering headers", "peer", peer, "headers", len(headers)) // Send the filter channel to the fetcher @@ -246,7 +249,7 @@ func (f *Fetcher) FilterHeaders(peer string, headers []*types.Header, time time. // FilterBodies extracts all the block bodies that were explicitly requested by // the fetcher, returning those that should be handled differently. -func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { +func (f *BlockFetcher) FilterBodies(peer string, transactions [][]*types.Transaction, uncles [][]*types.Header, time time.Time) ([][]*types.Transaction, [][]*types.Header) { log.Trace("Filtering bodies", "peer", peer, "txs", len(transactions), "uncles", len(uncles)) // Send the filter channel to the fetcher @@ -274,7 +277,7 @@ func (f *Fetcher) FilterBodies(peer string, transactions [][]*types.Transaction, // Loop is the main fetcher loop, checking and processing various notification // events. -func (f *Fetcher) loop() { +func (f *BlockFetcher) loop() { // Iterate the block fetching until a quit is requested fetchTimer := time.NewTimer(0) completeTimer := time.NewTimer(0) @@ -289,7 +292,7 @@ func (f *Fetcher) loop() { // Import any queued blocks that could potentially fit height := f.chainHeight() for !f.queue.Empty() { - op := f.queue.PopItem().(*inject) + op := f.queue.PopItem().(*blockInject) hash := op.block.Hash() if f.queueChangeHook != nil { f.queueChangeHook(hash, false) @@ -313,24 +316,24 @@ func (f *Fetcher) loop() { // Wait for an outside event to occur select { case <-f.quit: - // Fetcher terminating, abort all operations + // BlockFetcher terminating, abort all operations return case notification := <-f.notify: // A block was announced, make sure the peer isn't DOSing us - propAnnounceInMeter.Mark(1) + blockAnnounceInMeter.Mark(1) count := f.announces[notification.origin] + 1 if count > hashLimit { log.Debug("Peer exceeded outstanding announces", "peer", notification.origin, "limit", hashLimit) - propAnnounceDOSMeter.Mark(1) + blockAnnounceDOSMeter.Mark(1) break } // If we have a valid block number, check that it's potentially useful if notification.number > 0 { if dist := int64(notification.number) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Peer discarded announcement", "peer", notification.origin, "number", notification.number, "hash", notification.hash, "distance", dist) - propAnnounceDropMeter.Mark(1) + blockAnnounceDropMeter.Mark(1) break } } @@ -352,7 +355,7 @@ func (f *Fetcher) loop() { case op := <-f.inject: // A direct block insertion was requested, try and fill any pending gaps - propBroadcastInMeter.Mark(1) + blockBroadcastInMeter.Mark(1) f.enqueue(op.origin, op.block) case hash := <-f.done: @@ -439,7 +442,7 @@ func (f *Fetcher) loop() { // Split the batch of headers into unknown ones (to return to the caller), // known incomplete ones (requiring body retrievals) and completed blocks. - unknown, incomplete, complete := []*types.Header{}, []*announce{}, []*types.Block{} + unknown, incomplete, complete := []*types.Header{}, []*blockAnnounce{}, []*types.Block{} for _, header := range task.headers { hash := header.Hash() @@ -475,7 +478,7 @@ func (f *Fetcher) loop() { f.forgetHash(hash) } } else { - // Fetcher doesn't know about it, add to the return list + // BlockFetcher doesn't know about it, add to the return list unknown = append(unknown, header) } } @@ -562,8 +565,8 @@ func (f *Fetcher) loop() { } } -// rescheduleFetch resets the specified fetch timer to the next announce timeout. -func (f *Fetcher) rescheduleFetch(fetch *time.Timer) { +// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout. +func (f *BlockFetcher) rescheduleFetch(fetch *time.Timer) { // Short circuit if no blocks are announced if len(f.announced) == 0 { return @@ -579,7 +582,7 @@ func (f *Fetcher) rescheduleFetch(fetch *time.Timer) { } // rescheduleComplete resets the specified completion timer to the next fetch timeout. -func (f *Fetcher) rescheduleComplete(complete *time.Timer) { +func (f *BlockFetcher) rescheduleComplete(complete *time.Timer) { // Short circuit if no headers are fetched if len(f.fetched) == 0 { return @@ -596,27 +599,27 @@ func (f *Fetcher) rescheduleComplete(complete *time.Timer) { // enqueue schedules a new future import operation, if the block to be imported // has not yet been seen. -func (f *Fetcher) enqueue(peer string, block *types.Block) { +func (f *BlockFetcher) enqueue(peer string, block *types.Block) { hash := block.Hash() // Ensure the peer isn't DOSing us count := f.queues[peer] + 1 if count > blockLimit { log.Debug("Discarded propagated block, exceeded allowance", "peer", peer, "number", block.Number(), "hash", hash, "limit", blockLimit) - propBroadcastDOSMeter.Mark(1) + blockBroadcastDOSMeter.Mark(1) f.forgetHash(hash) return } // Discard any past or too distant blocks if dist := int64(block.NumberU64()) - int64(f.chainHeight()); dist < -maxUncleDist || dist > maxQueueDist { log.Debug("Discarded propagated block, too far away", "peer", peer, "number", block.Number(), "hash", hash, "distance", dist) - propBroadcastDropMeter.Mark(1) + blockBroadcastDropMeter.Mark(1) f.forgetHash(hash) return } // Schedule the block for future importing if _, ok := f.queued[hash]; !ok { - op := &inject{ + op := &blockInject{ origin: peer, block: block, } @@ -633,7 +636,7 @@ func (f *Fetcher) enqueue(peer string, block *types.Block) { // insert spawns a new goroutine to run a block insertion into the chain. If the // block's number is at the same height as the current import phase, it updates // the phase states accordingly. -func (f *Fetcher) insert(peer string, block *types.Block) { +func (f *BlockFetcher) insert(peer string, block *types.Block) { hash := block.Hash() // Run the import on a new thread @@ -651,7 +654,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) { switch err := f.verifyHeader(block.Header()); err { case nil: // All ok, quickly propagate to our peers - propBroadcastOutTimer.UpdateSince(block.ReceivedAt) + blockBroadcastOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, true) case consensus.ErrFutureBlock: @@ -669,7 +672,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) { return } // If import succeeded, broadcast the block - propAnnounceOutTimer.UpdateSince(block.ReceivedAt) + blockAnnounceOutTimer.UpdateSince(block.ReceivedAt) go f.broadcastBlock(block, false) // Invoke the testing hook if needed @@ -681,7 +684,7 @@ func (f *Fetcher) insert(peer string, block *types.Block) { // forgetHash removes all traces of a block announcement from the fetcher's // internal state. -func (f *Fetcher) forgetHash(hash common.Hash) { +func (f *BlockFetcher) forgetHash(hash common.Hash) { // Remove all pending announces and decrement DOS counters for _, announce := range f.announced[hash] { f.announces[announce.origin]-- @@ -723,7 +726,7 @@ func (f *Fetcher) forgetHash(hash common.Hash) { // forgetBlock removes all traces of a queued block from the fetcher's internal // state. -func (f *Fetcher) forgetBlock(hash common.Hash) { +func (f *BlockFetcher) forgetBlock(hash common.Hash) { if insert := f.queued[hash]; insert != nil { f.queues[insert.origin]-- if f.queues[insert.origin] == 0 { diff --git a/eth/fetcher/fetcher_test.go b/eth/fetcher/block_fetcher_test.go similarity index 99% rename from eth/fetcher/fetcher_test.go rename to eth/fetcher/block_fetcher_test.go index 83172c534..038ead12e 100644 --- a/eth/fetcher/fetcher_test.go +++ b/eth/fetcher/block_fetcher_test.go @@ -76,7 +76,7 @@ func makeChain(n int, seed byte, parent *types.Block) ([]common.Hash, map[common // fetcherTester is a test simulator for mocking out local block chain. type fetcherTester struct { - fetcher *Fetcher + fetcher *BlockFetcher hashes []common.Hash // Hash chain belonging to the tester blocks map[common.Hash]*types.Block // Blocks belonging to the tester @@ -92,7 +92,7 @@ func newTester() *fetcherTester { blocks: map[common.Hash]*types.Block{genesis.Hash(): genesis}, drops: make(map[string]bool), } - tester.fetcher = New(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer) + tester.fetcher = NewBlockFetcher(tester.getBlock, tester.verifyHeader, tester.broadcastBlock, tester.chainHeight, tester.insertChain, tester.dropPeer) tester.fetcher.Start() return tester diff --git a/eth/fetcher/metrics.go b/eth/fetcher/metrics.go index d68d12f00..b75889938 100644 --- a/eth/fetcher/metrics.go +++ b/eth/fetcher/metrics.go @@ -23,15 +23,15 @@ import ( ) var ( - propAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/in", nil) - propAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/announces/out", nil) - propAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/drop", nil) - propAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/announces/dos", nil) + blockAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/in", nil) + blockAnnounceOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/announces/out", nil) + blockAnnounceDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/drop", nil) + blockAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/announces/dos", nil) - propBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/in", nil) - propBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/broadcasts/out", nil) - propBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/drop", nil) - propBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/broadcasts/dos", nil) + blockBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/in", nil) + blockBroadcastOutTimer = metrics.NewRegisteredTimer("eth/fetcher/prop/block/broadcasts/out", nil) + blockBroadcastDropMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/drop", nil) + blockBroadcastDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/block/broadcasts/dos", nil) headerFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/headers", nil) bodyFetchMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/bodies", nil) @@ -40,4 +40,15 @@ var ( headerFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/headers/out", nil) bodyFilterInMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/in", nil) bodyFilterOutMeter = metrics.NewRegisteredMeter("eth/fetcher/filter/bodies/out", nil) + + txAnnounceInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/in", nil) + txAnnounceDOSMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/dos", nil) + txAnnounceSkipMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/skip", nil) + txAnnounceUnderpriceMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/announces/underprice", nil) + txBroadcastInMeter = metrics.NewRegisteredMeter("eth/fetcher/prop/transaction/broadcasts/in", nil) + txFetchOutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/out", nil) + txFetchSuccessMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/success", nil) + txFetchTimeoutMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/timeout", nil) + txFetchInvalidMeter = metrics.NewRegisteredMeter("eth/fetcher/fetch/transaction/invalid", nil) + txFetchDurationTimer = metrics.NewRegisteredTimer("eth/fetcher/fetch/transaction/duration", nil) ) diff --git a/eth/fetcher/tx_fetcher.go b/eth/fetcher/tx_fetcher.go new file mode 100644 index 000000000..1dabb0819 --- /dev/null +++ b/eth/fetcher/tx_fetcher.go @@ -0,0 +1,319 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package fetcher + +import ( + "math/rand" + "time" + + mapset "github.com/deckarep/golang-set" + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/log" +) + +var ( + // txAnnounceLimit is the maximum number of unique transaction a peer + // can announce in a short time. + txAnnounceLimit = 4096 + + // txFetchTimeout is the maximum allotted time to return an explicitly + // requested transaction. + txFetchTimeout = 5 * time.Second + + // MaxTransactionFetch is the maximum transaction number can be fetched + // in one request. The rationale to pick this value is: + // In eth protocol, the softResponseLimit is 2MB. Nowdays according to + // Etherscan the average transaction size is around 200B, so in theory + // we can include lots of transaction in a single protocol packet. However + // the maximum size of a single transaction is raised to 128KB, so pick + // a middle value here to ensure we can maximize the efficiency of the + // retrieval and response size overflow won't happen in most cases. + MaxTransactionFetch = 256 + + // underpriceSetSize is the size of underprice set which used for maintaining + // the set of underprice transactions. + underpriceSetSize = 4096 +) + +// txAnnounce is the notification of the availability of a single +// new transaction in the network. +type txAnnounce struct { + origin string // Identifier of the peer originating the notification + time time.Time // Timestamp of the announcement + fetchTxs func([]common.Hash) // Callback for retrieving transaction from specified peer +} + +// txsAnnounce is the notification of the availability of a batch +// of new transactions in the network. +type txsAnnounce struct { + hashes []common.Hash // Batch of transaction hashes being announced + origin string // Identifier of the peer originating the notification + time time.Time // Timestamp of the announcement + fetchTxs func([]common.Hash) // Callback for retrieving transaction from specified peer +} + +// TxFetcher is responsible for retrieving new transaction based +// on the announcement. +type TxFetcher struct { + notify chan *txsAnnounce + cleanup chan []common.Hash + quit chan struct{} + + // Announce states + announces map[string]int // Per peer transaction announce counts to prevent memory exhaustion + announced map[common.Hash][]*txAnnounce // Announced transactions, scheduled for fetching + fetching map[common.Hash]*txAnnounce // Announced transactions, currently fetching + underpriced mapset.Set // Transaction set whose price is too low for accepting + + // Callbacks + hasTx func(common.Hash) bool // Retrieves a tx from the local txpool + addTxs func([]*types.Transaction) []error // Insert a batch of transactions into local txpool + dropPeer func(string) // Drop the specified peer + + // Hooks + announceHook func([]common.Hash) // Hook which is called when a batch transactions are announced + importTxsHook func([]*types.Transaction) // Hook which is called when a batch of transactions are imported. + dropHook func(string) // Hook which is called when a peer is dropped + cleanupHook func([]common.Hash) // Hook which is called when internal status is cleaned + rejectUnderprice func(common.Hash) // Hook which is called when underprice transaction is rejected +} + +// NewTxFetcher creates a transaction fetcher to retrieve transaction +// based on hash announcements. +func NewTxFetcher(hasTx func(common.Hash) bool, addTxs func([]*types.Transaction) []error, dropPeer func(string)) *TxFetcher { + return &TxFetcher{ + notify: make(chan *txsAnnounce), + cleanup: make(chan []common.Hash), + quit: make(chan struct{}), + announces: make(map[string]int), + announced: make(map[common.Hash][]*txAnnounce), + fetching: make(map[common.Hash]*txAnnounce), + underpriced: mapset.NewSet(), + hasTx: hasTx, + addTxs: addTxs, + dropPeer: dropPeer, + } +} + +// Notify announces the fetcher of the potential availability of a +// new transaction in the network. +func (f *TxFetcher) Notify(peer string, hashes []common.Hash, time time.Time, fetchTxs func([]common.Hash)) error { + announce := &txsAnnounce{ + hashes: hashes, + time: time, + origin: peer, + fetchTxs: fetchTxs, + } + select { + case f.notify <- announce: + return nil + case <-f.quit: + return errTerminated + } +} + +// EnqueueTxs imports a batch of received transaction into fetcher. +func (f *TxFetcher) EnqueueTxs(peer string, txs []*types.Transaction) error { + var ( + drop bool + hashes []common.Hash + ) + errs := f.addTxs(txs) + for i, err := range errs { + if err != nil { + // Drop peer if the received transaction isn't signed properly. + drop = (drop || err == core.ErrInvalidSender) + txFetchInvalidMeter.Mark(1) + + // Track the transaction hash if the price is too low for us. + // Avoid re-request this transaction when we receive another + // announcement. + if err == core.ErrUnderpriced { + for f.underpriced.Cardinality() >= underpriceSetSize { + f.underpriced.Pop() + } + f.underpriced.Add(txs[i].Hash()) + } + } + hashes = append(hashes, txs[i].Hash()) + } + if f.importTxsHook != nil { + f.importTxsHook(txs) + } + // Drop the peer if some transaction failed signature verification. + // We can regard this peer is trying to DOS us by feeding lots of + // random hashes. + if drop { + f.dropPeer(peer) + if f.dropHook != nil { + f.dropHook(peer) + } + } + select { + case f.cleanup <- hashes: + return nil + case <-f.quit: + return errTerminated + } +} + +// Start boots up the announcement based synchroniser, accepting and processing +// hash notifications and block fetches until termination requested. +func (f *TxFetcher) Start() { + go f.loop() +} + +// Stop terminates the announcement based synchroniser, canceling all pending +// operations. +func (f *TxFetcher) Stop() { + close(f.quit) +} + +func (f *TxFetcher) loop() { + fetchTimer := time.NewTimer(0) + + for { + // Clean up any expired transaction fetches. + // There are many cases can lead to it: + // * We send the request to busy peer which can reply immediately + // * We send the request to malicious peer which doesn't reply deliberately + // * We send the request to normal peer for a batch of transaction, but some + // transactions have been included into blocks. According to EIP these txs + // won't be included. + // But it's fine to delete the fetching record and reschedule fetching iff we + // receive the annoucement again. + for hash, announce := range f.fetching { + if time.Since(announce.time) > txFetchTimeout { + delete(f.fetching, hash) + txFetchTimeoutMeter.Mark(1) + } + } + select { + case anno := <-f.notify: + txAnnounceInMeter.Mark(int64(len(anno.hashes))) + + // Drop the new announce if there are too many accumulated. + count := f.announces[anno.origin] + len(anno.hashes) + if count > txAnnounceLimit { + txAnnounceDOSMeter.Mark(int64(count - txAnnounceLimit)) + break + } + f.announces[anno.origin] = count + + // All is well, schedule the announce if transaction is not yet downloading + empty := len(f.announced) == 0 + for _, hash := range anno.hashes { + if _, ok := f.fetching[hash]; ok { + continue + } + if f.underpriced.Contains(hash) { + txAnnounceUnderpriceMeter.Mark(1) + if f.rejectUnderprice != nil { + f.rejectUnderprice(hash) + } + continue + } + f.announced[hash] = append(f.announced[hash], &txAnnounce{ + origin: anno.origin, + time: anno.time, + fetchTxs: anno.fetchTxs, + }) + } + if empty && len(f.announced) > 0 { + f.reschedule(fetchTimer) + } + if f.announceHook != nil { + f.announceHook(anno.hashes) + } + case <-fetchTimer.C: + // At least one tx's timer ran out, check for needing retrieval + request := make(map[string][]common.Hash) + + for hash, announces := range f.announced { + if time.Since(announces[0].time) > arriveTimeout-gatherSlack { + // Pick a random peer to retrieve from, reset all others + announce := announces[rand.Intn(len(announces))] + f.forgetHash(hash) + + // Skip fetching if we already receive the transaction. + if f.hasTx(hash) { + txAnnounceSkipMeter.Mark(1) + continue + } + // If the transaction still didn't arrive, queue for fetching + request[announce.origin] = append(request[announce.origin], hash) + f.fetching[hash] = announce + } + } + // Send out all block header requests + for peer, hashes := range request { + log.Trace("Fetching scheduled transactions", "peer", peer, "txs", hashes) + fetchTxs := f.fetching[hashes[0]].fetchTxs + fetchTxs(hashes) + txFetchOutMeter.Mark(int64(len(hashes))) + } + // Schedule the next fetch if blocks are still pending + f.reschedule(fetchTimer) + case hashes := <-f.cleanup: + for _, hash := range hashes { + f.forgetHash(hash) + anno, exist := f.fetching[hash] + if !exist { + txBroadcastInMeter.Mark(1) // Directly transaction propagation + continue + } + txFetchDurationTimer.UpdateSince(anno.time) + txFetchSuccessMeter.Mark(1) + delete(f.fetching, hash) + } + if f.cleanupHook != nil { + f.cleanupHook(hashes) + } + case <-f.quit: + return + } + } +} + +// rescheduleFetch resets the specified fetch timer to the next blockAnnounce timeout. +func (f *TxFetcher) reschedule(fetch *time.Timer) { + // Short circuit if no transactions are announced + if len(f.announced) == 0 { + return + } + // Otherwise find the earliest expiring announcement + earliest := time.Now() + for _, announces := range f.announced { + if earliest.After(announces[0].time) { + earliest = announces[0].time + } + } + fetch.Reset(arriveTimeout - time.Since(earliest)) +} + +func (f *TxFetcher) forgetHash(hash common.Hash) { + // Remove all pending announces and decrement DOS counters + for _, announce := range f.announced[hash] { + f.announces[announce.origin]-- + if f.announces[announce.origin] <= 0 { + delete(f.announces, announce.origin) + } + } + delete(f.announced, hash) +} diff --git a/eth/fetcher/tx_fetcher_test.go b/eth/fetcher/tx_fetcher_test.go new file mode 100644 index 000000000..26f24f3f3 --- /dev/null +++ b/eth/fetcher/tx_fetcher_test.go @@ -0,0 +1,318 @@ +// Copyright 2020 The go-ethereum Authors +// This file is part of the go-ethereum library. +// +// The go-ethereum library is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// The go-ethereum library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with the go-ethereum library. If not, see . + +package fetcher + +import ( + "crypto/ecdsa" + "math/big" + "math/rand" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core" + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/crypto" +) + +func init() { + rand.Seed(int64(time.Now().Nanosecond())) + + txAnnounceLimit = 64 + MaxTransactionFetch = 16 +} + +func makeTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction { + var txs []*types.Transaction + + for i := 0; i < target; i++ { + random := rand.Uint32() + tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil) + tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), key) + txs = append(txs, tx) + } + return txs +} + +func makeUnsignedTransactions(key *ecdsa.PrivateKey, target int) []*types.Transaction { + var txs []*types.Transaction + + for i := 0; i < target; i++ { + random := rand.Uint32() + tx := types.NewTransaction(uint64(random), common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(random)), 100, big.NewInt(int64(random)), nil) + txs = append(txs, tx) + } + return txs +} + +type txfetcherTester struct { + fetcher *TxFetcher + + priceLimit *big.Int + sender *ecdsa.PrivateKey + senderAddr common.Address + signer types.Signer + txs map[common.Hash]*types.Transaction + dropped map[string]struct{} + lock sync.RWMutex +} + +func newTxFetcherTester() *txfetcherTester { + key, _ := crypto.GenerateKey() + addr := crypto.PubkeyToAddress(key.PublicKey) + t := &txfetcherTester{ + sender: key, + senderAddr: addr, + signer: types.NewEIP155Signer(big.NewInt(1)), + txs: make(map[common.Hash]*types.Transaction), + dropped: make(map[string]struct{}), + } + t.fetcher = NewTxFetcher(t.hasTx, t.addTxs, t.dropPeer) + t.fetcher.Start() + return t +} + +func (t *txfetcherTester) hasTx(hash common.Hash) bool { + t.lock.RLock() + defer t.lock.RUnlock() + + return t.txs[hash] != nil +} + +func (t *txfetcherTester) addTxs(txs []*types.Transaction) []error { + t.lock.Lock() + defer t.lock.Unlock() + + var errors []error + for _, tx := range txs { + // Make sure the transaction is signed properly + _, err := types.Sender(t.signer, tx) + if err != nil { + errors = append(errors, core.ErrInvalidSender) + continue + } + // Make sure the price is high enough to accpet + if t.priceLimit != nil && tx.GasPrice().Cmp(t.priceLimit) < 0 { + errors = append(errors, core.ErrUnderpriced) + continue + } + t.txs[tx.Hash()] = tx + errors = append(errors, nil) + } + return errors +} + +func (t *txfetcherTester) dropPeer(id string) { + t.lock.Lock() + defer t.lock.Unlock() + + t.dropped[id] = struct{}{} +} + +// makeTxFetcher retrieves a batch of transaction associated with a simulated peer. +func (t *txfetcherTester) makeTxFetcher(peer string, txs []*types.Transaction) func(hashes []common.Hash) { + closure := make(map[common.Hash]*types.Transaction) + for _, tx := range txs { + closure[tx.Hash()] = tx + } + return func(hashes []common.Hash) { + var txs []*types.Transaction + for _, hash := range hashes { + tx := closure[hash] + if tx == nil { + continue + } + txs = append(txs, tx) + } + // Return on a new thread + go t.fetcher.EnqueueTxs(peer, txs) + } +} + +func TestSequentialTxAnnouncements(t *testing.T) { + tester := newTxFetcherTester() + txs := makeTransactions(tester.sender, txAnnounceLimit) + + retrieveTxs := tester.makeTxFetcher("peer", txs) + + newTxsCh := make(chan struct{}) + tester.fetcher.importTxsHook = func(transactions []*types.Transaction) { + newTxsCh <- struct{}{} + } + for _, tx := range txs { + tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), retrieveTxs) + select { + case <-newTxsCh: + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } + } + if len(tester.txs) != len(txs) { + t.Fatalf("Imported transaction number mismatch, want %d, got %d", len(txs), len(tester.txs)) + } +} + +func TestConcurrentAnnouncements(t *testing.T) { + tester := newTxFetcherTester() + txs := makeTransactions(tester.sender, txAnnounceLimit) + + txFetcherFn1 := tester.makeTxFetcher("peer1", txs) + txFetcherFn2 := tester.makeTxFetcher("peer2", txs) + + var ( + count uint32 + done = make(chan struct{}) + ) + tester.fetcher.importTxsHook = func(transactions []*types.Transaction) { + atomic.AddUint32(&count, uint32(len(transactions))) + if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) { + done <- struct{}{} + } + } + for _, tx := range txs { + tester.fetcher.Notify("peer1", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn1) + tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout+time.Millisecond), txFetcherFn2) + tester.fetcher.Notify("peer2", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout-time.Millisecond), txFetcherFn2) + } + select { + case <-done: + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } +} + +func TestBatchAnnouncements(t *testing.T) { + tester := newTxFetcherTester() + txs := makeTransactions(tester.sender, txAnnounceLimit) + + retrieveTxs := tester.makeTxFetcher("peer", txs) + + var count uint32 + var done = make(chan struct{}) + tester.fetcher.importTxsHook = func(txs []*types.Transaction) { + atomic.AddUint32(&count, uint32(len(txs))) + + if atomic.LoadUint32(&count) >= uint32(txAnnounceLimit) { + done <- struct{}{} + } + } + // Send all announces which exceeds the limit. + var hashes []common.Hash + for _, tx := range txs { + hashes = append(hashes, tx.Hash()) + } + tester.fetcher.Notify("peer", hashes, time.Now(), retrieveTxs) + + select { + case <-done: + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } +} + +func TestPropagationAfterAnnounce(t *testing.T) { + tester := newTxFetcherTester() + txs := makeTransactions(tester.sender, txAnnounceLimit) + + var cleaned = make(chan struct{}) + tester.fetcher.cleanupHook = func(hashes []common.Hash) { + cleaned <- struct{}{} + } + retrieveTxs := tester.makeTxFetcher("peer", txs) + for _, tx := range txs { + tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), retrieveTxs) + tester.fetcher.EnqueueTxs("peer", []*types.Transaction{tx}) + + // It's ok to read the map directly since no write + // will happen in the same time. + <-cleaned + if len(tester.fetcher.announced) != 0 { + t.Fatalf("Announcement should be cleaned, got %d", len(tester.fetcher.announced)) + } + } +} + +func TestEnqueueTransactions(t *testing.T) { + tester := newTxFetcherTester() + txs := makeTransactions(tester.sender, txAnnounceLimit) + + done := make(chan struct{}) + tester.fetcher.importTxsHook = func(transactions []*types.Transaction) { + if len(transactions) == txAnnounceLimit { + done <- struct{}{} + } + } + go tester.fetcher.EnqueueTxs("peer", txs) + select { + case <-done: + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } +} + +func TestInvalidTxAnnounces(t *testing.T) { + tester := newTxFetcherTester() + + var txs []*types.Transaction + txs = append(txs, makeUnsignedTransactions(tester.sender, 1)...) + txs = append(txs, makeTransactions(tester.sender, 1)...) + + txFetcherFn := tester.makeTxFetcher("peer", txs) + + dropped := make(chan string, 1) + tester.fetcher.dropHook = func(s string) { dropped <- s } + + for _, tx := range txs { + tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now(), txFetcherFn) + } + select { + case s := <-dropped: + if s != "peer" { + t.Fatalf("invalid dropped peer") + } + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } +} + +func TestRejectUnderpriced(t *testing.T) { + tester := newTxFetcherTester() + tester.priceLimit = big.NewInt(10000) + + done := make(chan struct{}) + tester.fetcher.importTxsHook = func([]*types.Transaction) { done <- struct{}{} } + reject := make(chan struct{}) + tester.fetcher.rejectUnderprice = func(common.Hash) { reject <- struct{}{} } + + tx := types.NewTransaction(0, common.Address{0x1, 0x2, 0x3}, big.NewInt(int64(100)), 100, big.NewInt(int64(100)), nil) + tx, _ = types.SignTx(tx, types.NewEIP155Signer(big.NewInt(1)), tester.sender) + txFetcherFn := tester.makeTxFetcher("peer", []*types.Transaction{tx}) + + // Send the announcement first time + tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn) + <-done + + // Resend the announcement, shouldn't schedule fetching this time + tester.fetcher.Notify("peer", []common.Hash{tx.Hash()}, time.Now().Add(-arriveTimeout), txFetcherFn) + select { + case <-reject: + case <-time.NewTimer(time.Second).C: + t.Fatalf("timeout") + } +} diff --git a/eth/handler.go b/eth/handler.go index ae2b764cf..d527b15d1 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -77,9 +77,10 @@ type ProtocolManager struct { blockchain *core.BlockChain maxPeers int - downloader *downloader.Downloader - fetcher *fetcher.Fetcher - peers *peerSet + downloader *downloader.Downloader + blockFetcher *fetcher.BlockFetcher + txFetcher *fetcher.TxFetcher + peers *peerSet eventMux *event.TypeMux txsCh chan core.NewTxsEvent @@ -97,6 +98,9 @@ type ProtocolManager struct { // wait group is used for graceful shutdowns during downloading // and processing wg sync.WaitGroup + + // Test fields or hooks + broadcastTxAnnouncesOnly bool // Testing field, disable transaction propagation } // NewProtocolManager returns a new Ethereum sub protocol manager. The Ethereum sub protocol manages peers capable @@ -187,7 +191,8 @@ func NewProtocolManager(config *params.ChainConfig, checkpoint *params.TrustedCh } return n, err } - manager.fetcher = fetcher.New(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + manager.blockFetcher = fetcher.NewBlockFetcher(blockchain.GetBlockByHash, validator, manager.BroadcastBlock, heighter, inserter, manager.removePeer) + manager.txFetcher = fetcher.NewTxFetcher(txpool.Has, txpool.AddRemotes, manager.removePeer) return manager, nil } @@ -203,7 +208,7 @@ func (pm *ProtocolManager) makeProtocol(version uint) p2p.Protocol { Version: version, Length: length, Run: func(p *p2p.Peer, rw p2p.MsgReadWriter) error { - peer := pm.newPeer(int(version), p, rw) + peer := pm.newPeer(int(version), p, rw, pm.txpool.Get) select { case pm.newPeerCh <- peer: pm.wg.Add(1) @@ -286,8 +291,8 @@ func (pm *ProtocolManager) Stop() { log.Info("Ethereum protocol stopped") } -func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { - return newPeer(pv, p, newMeteredMsgWriter(rw)) +func (pm *ProtocolManager) newPeer(pv int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer { + return newPeer(pv, p, newMeteredMsgWriter(rw), getPooledTx) } // handle is the callback invoked to manage the life cycle of an eth peer. When @@ -514,7 +519,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { p.Log().Debug("Whitelist block verified", "number", headers[0].Number.Uint64(), "hash", want) } // Irrelevant of the fork checks, send the header to the fetcher just in case - headers = pm.fetcher.FilterHeaders(p.id, headers, time.Now()) + headers = pm.blockFetcher.FilterHeaders(p.id, headers, time.Now()) } if len(headers) > 0 || !filter { err := pm.downloader.DeliverHeaders(p.id, headers) @@ -567,7 +572,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Filter out any explicitly requested bodies, deliver the rest to the downloader filter := len(transactions) > 0 || len(uncles) > 0 if filter { - transactions, uncles = pm.fetcher.FilterBodies(p.id, transactions, uncles, time.Now()) + transactions, uncles = pm.blockFetcher.FilterBodies(p.id, transactions, uncles, time.Now()) } if len(transactions) > 0 || len(uncles) > 0 || !filter { err := pm.downloader.DeliverBodies(p.id, transactions, uncles) @@ -678,7 +683,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } for _, block := range unknown { - pm.fetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) + pm.blockFetcher.Notify(p.id, block.Hash, block.Number, time.Now(), p.RequestOneHeader, p.RequestBodies) } case msg.Code == NewBlockMsg: @@ -703,7 +708,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { // Mark the peer as owning the block and schedule it for import p.MarkBlock(request.Block.Hash()) - pm.fetcher.Enqueue(p.id, request.Block) + pm.blockFetcher.Enqueue(p.id, request.Block) // Assuming the block is importable by the peer, but possibly not yet done so, // calculate the head hash and TD that the peer truly must have. @@ -724,6 +729,66 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } } + case msg.Code == NewPooledTransactionHashesMsg && p.version >= eth65: + // New transaction announcement arrived, make sure we have + // a valid and fresh chain to handle them + if atomic.LoadUint32(&pm.acceptTxs) == 0 { + break + } + var hashes []common.Hash + if err := msg.Decode(&hashes); err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Schedule all the unknown hashes for retrieval + var unknown []common.Hash + for _, hash := range hashes { + // Mark the hashes as present at the remote node + p.MarkTransaction(hash) + + // Filter duplicated transaction announcement. + // Notably we only dedupliate announcement in txpool, check the rationale + // behind in EIP https://github.com/ethereum/EIPs/pull/2464. + if pm.txpool.Has(hash) { + continue + } + unknown = append(unknown, hash) + } + pm.txFetcher.Notify(p.id, unknown, time.Now(), p.AsyncRequestTxs) + + case msg.Code == GetPooledTransactionsMsg && p.version >= eth65: + // Decode the retrieval message + msgStream := rlp.NewStream(msg.Payload, uint64(msg.Size)) + if _, err := msgStream.List(); err != nil { + return err + } + // Gather transactions until the fetch or network limits is reached + var ( + hash common.Hash + bytes int + txs []rlp.RawValue + ) + for bytes < softResponseLimit { + // Retrieve the hash of the next block + if err := msgStream.Decode(&hash); err == rlp.EOL { + break + } else if err != nil { + return errResp(ErrDecode, "msg %v: %v", msg, err) + } + // Retrieve the requested transaction, skipping if unknown to us + tx := pm.txpool.Get(hash) + if tx == nil { + continue + } + // If known, encode and queue for response packet + if encoded, err := rlp.EncodeToBytes(tx); err != nil { + log.Error("Failed to encode transaction", "err", err) + } else { + txs = append(txs, encoded) + bytes += len(encoded) + } + } + return p.SendTransactionRLP(txs) + case msg.Code == TxMsg: // Transactions arrived, make sure we have a valid and fresh chain to handle them if atomic.LoadUint32(&pm.acceptTxs) == 0 { @@ -741,7 +806,7 @@ func (pm *ProtocolManager) handleMsg(p *peer) error { } p.MarkTransaction(tx.Hash()) } - pm.txpool.AddRemotes(txs) + pm.txFetcher.EnqueueTxs(p.id, txs) default: return errResp(ErrInvalidMsgCode, "%v", msg.Code) @@ -791,20 +856,48 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { // BroadcastTxs will propagate a batch of transactions to all peers which are not known to // already have the given transaction. -func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions) { - var txset = make(map[*peer]types.Transactions) - +func (pm *ProtocolManager) BroadcastTxs(txs types.Transactions, propagate bool) { + var ( + txset = make(map[*peer][]common.Hash) + annos = make(map[*peer][]common.Hash) + ) // Broadcast transactions to a batch of peers not knowing about it + if propagate { + for _, tx := range txs { + peers := pm.peers.PeersWithoutTx(tx.Hash()) + + // Send the block to a subset of our peers + transferLen := int(math.Sqrt(float64(len(peers)))) + if transferLen < minBroadcastPeers { + transferLen = minBroadcastPeers + } + if transferLen > len(peers) { + transferLen = len(peers) + } + transfer := peers[:transferLen] + for _, peer := range transfer { + txset[peer] = append(txset[peer], tx.Hash()) + } + log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) + } + for peer, hashes := range txset { + peer.AsyncSendTransactions(hashes) + } + return + } + // Otherwise only broadcast the announcement to peers for _, tx := range txs { peers := pm.peers.PeersWithoutTx(tx.Hash()) for _, peer := range peers { - txset[peer] = append(txset[peer], tx) + annos[peer] = append(annos[peer], tx.Hash()) } - log.Trace("Broadcast transaction", "hash", tx.Hash(), "recipients", len(peers)) } - // FIXME include this again: peers = peers[:int(math.Sqrt(float64(len(peers))))] - for peer, txs := range txset { - peer.AsyncSendTransactions(txs) + for peer, hashes := range annos { + if peer.version >= eth65 { + peer.AsyncSendTransactionHashes(hashes) + } else { + peer.AsyncSendTransactions(hashes) + } } } @@ -823,7 +916,13 @@ func (pm *ProtocolManager) txBroadcastLoop() { for { select { case event := <-pm.txsCh: - pm.BroadcastTxs(event.Txs) + // For testing purpose only, disable propagation + if pm.broadcastTxAnnouncesOnly { + pm.BroadcastTxs(event.Txs, false) + continue + } + pm.BroadcastTxs(event.Txs, true) // First propagate transactions to peers + pm.BroadcastTxs(event.Txs, false) // Only then announce to the rest // Err() channel will be closed when unsubscribing. case <-pm.txsSub.Err(): diff --git a/eth/handler_test.go b/eth/handler_test.go index 354cbc068..97613a983 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -495,7 +495,7 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } - pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), new(testTxPool), ethash.NewFaker(), blockchain, db, 1, nil) + pm, err := NewProtocolManager(config, cht, syncmode, DefaultConfig.NetworkId, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, ethash.NewFaker(), blockchain, db, 1, nil) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } @@ -582,7 +582,7 @@ func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) { if err != nil { t.Fatalf("failed to create new blockchain: %v", err) } - pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db, 1, nil) + pm, err := NewProtocolManager(config, nil, downloader.FullSync, DefaultConfig.NetworkId, evmux, &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, pow, blockchain, db, 1, nil) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) } diff --git a/eth/helper_test.go b/eth/helper_test.go index e66910334..bec37e16c 100644 --- a/eth/helper_test.go +++ b/eth/helper_test.go @@ -68,7 +68,7 @@ func newTestProtocolManager(mode downloader.SyncMode, blocks int, generator func if _, err := blockchain.InsertChain(chain); err != nil { panic(err) } - pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx}, engine, blockchain, db, 1, nil) + pm, err := NewProtocolManager(gspec.Config, nil, mode, DefaultConfig.NetworkId, evmux, &testTxPool{added: newtx, pool: make(map[common.Hash]*types.Transaction)}, engine, blockchain, db, 1, nil) if err != nil { return nil, nil, err } @@ -91,22 +91,43 @@ func newTestProtocolManagerMust(t *testing.T, mode downloader.SyncMode, blocks i // testTxPool is a fake, helper transaction pool for testing purposes type testTxPool struct { txFeed event.Feed - pool []*types.Transaction // Collection of all transactions - added chan<- []*types.Transaction // Notification channel for new transactions + pool map[common.Hash]*types.Transaction // Hash map of collected transactions + added chan<- []*types.Transaction // Notification channel for new transactions lock sync.RWMutex // Protects the transaction pool } +// Has returns an indicator whether txpool has a transaction +// cached with the given hash. +func (p *testTxPool) Has(hash common.Hash) bool { + p.lock.Lock() + defer p.lock.Unlock() + + return p.pool[hash] != nil +} + +// Get retrieves the transaction from local txpool with given +// tx hash. +func (p *testTxPool) Get(hash common.Hash) *types.Transaction { + p.lock.Lock() + defer p.lock.Unlock() + + return p.pool[hash] +} + // AddRemotes appends a batch of transactions to the pool, and notifies any // listeners if the addition channel is non nil func (p *testTxPool) AddRemotes(txs []*types.Transaction) []error { p.lock.Lock() defer p.lock.Unlock() - p.pool = append(p.pool, txs...) + for _, tx := range txs { + p.pool[tx.Hash()] = tx + } if p.added != nil { p.added <- txs } + p.txFeed.Send(core.NewTxsEvent{Txs: txs}) return make([]error, len(txs)) } @@ -153,7 +174,7 @@ func newTestPeer(name string, version int, pm *ProtocolManager, shake bool) (*te var id enode.ID rand.Read(id[:]) - peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net) + peer := pm.newPeer(version, p2p.NewPeer(id, name, nil), net, pm.txpool.Get) // Start the peer on a new thread errc := make(chan error, 1) @@ -191,7 +212,7 @@ func (p *testPeer) handshake(t *testing.T, td *big.Int, head common.Hash, genesi CurrentBlock: head, GenesisBlock: genesis, } - case p.version == eth64: + case p.version >= eth64: msg = &statusData{ ProtocolVersion: uint32(p.version), NetworkID: DefaultConfig.NetworkId, diff --git a/eth/peer.go b/eth/peer.go index 0beec1d84..f4b939b71 100644 --- a/eth/peer.go +++ b/eth/peer.go @@ -27,6 +27,7 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/core/forkid" "github.com/ethereum/go-ethereum/core/types" + "github.com/ethereum/go-ethereum/eth/fetcher" "github.com/ethereum/go-ethereum/p2p" "github.com/ethereum/go-ethereum/rlp" ) @@ -41,24 +42,39 @@ const ( maxKnownTxs = 32768 // Maximum transactions hashes to keep in the known list (prevent DOS) maxKnownBlocks = 1024 // Maximum block hashes to keep in the known list (prevent DOS) - // maxQueuedTxs is the maximum number of transaction lists to queue up before - // dropping broadcasts. This is a sensitive number as a transaction list might - // contain a single transaction, or thousands. - maxQueuedTxs = 128 + // maxQueuedTxs is the maximum number of transactions to queue up before dropping + // broadcasts. + maxQueuedTxs = 4096 - // maxQueuedProps is the maximum number of block propagations to queue up before + // maxQueuedTxAnns is the maximum number of transaction announcements to queue up + // before dropping broadcasts. + maxQueuedTxAnns = 4096 + + // maxQueuedTxRetrieval is the maximum number of tx retrieval requests to queue up + // before dropping requests. + maxQueuedTxRetrieval = 4096 + + // maxQueuedBlocks is the maximum number of block propagations to queue up before // dropping broadcasts. There's not much point in queueing stale blocks, so a few // that might cover uncles should be enough. - maxQueuedProps = 4 + maxQueuedBlocks = 4 - // maxQueuedAnns is the maximum number of block announcements to queue up before + // maxQueuedBlockAnns is the maximum number of block announcements to queue up before // dropping broadcasts. Similarly to block propagations, there's no point to queue // above some healthy uncle limit, so use that. - maxQueuedAnns = 4 + maxQueuedBlockAnns = 4 handshakeTimeout = 5 * time.Second ) +// max is a helper function which returns the larger of the two given integers. +func max(a, b int) int { + if a > b { + return a + } + return b +} + // PeerInfo represents a short summary of the Ethereum sub-protocol metadata known // about a connected peer. type PeerInfo struct { @@ -86,48 +102,48 @@ type peer struct { td *big.Int lock sync.RWMutex - knownTxs mapset.Set // Set of transaction hashes known to be known by this peer - knownBlocks mapset.Set // Set of block hashes known to be known by this peer - queuedTxs chan []*types.Transaction // Queue of transactions to broadcast to the peer - queuedProps chan *propEvent // Queue of blocks to broadcast to the peer - queuedAnns chan *types.Block // Queue of blocks to announce to the peer - term chan struct{} // Termination channel to stop the broadcaster + knownTxs mapset.Set // Set of transaction hashes known to be known by this peer + knownBlocks mapset.Set // Set of block hashes known to be known by this peer + queuedBlocks chan *propEvent // Queue of blocks to broadcast to the peer + queuedBlockAnns chan *types.Block // Queue of blocks to announce to the peer + txPropagation chan []common.Hash // Channel used to queue transaction propagation requests + txAnnounce chan []common.Hash // Channel used to queue transaction announcement requests + txRetrieval chan []common.Hash // Channel used to queue transaction retrieval requests + getPooledTx func(common.Hash) *types.Transaction // Callback used to retrieve transaction from txpool + term chan struct{} // Termination channel to stop the broadcaster } -func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter) *peer { +func newPeer(version int, p *p2p.Peer, rw p2p.MsgReadWriter, getPooledTx func(hash common.Hash) *types.Transaction) *peer { return &peer{ - Peer: p, - rw: rw, - version: version, - id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), - knownTxs: mapset.NewSet(), - knownBlocks: mapset.NewSet(), - queuedTxs: make(chan []*types.Transaction, maxQueuedTxs), - queuedProps: make(chan *propEvent, maxQueuedProps), - queuedAnns: make(chan *types.Block, maxQueuedAnns), - term: make(chan struct{}), + Peer: p, + rw: rw, + version: version, + id: fmt.Sprintf("%x", p.ID().Bytes()[:8]), + knownTxs: mapset.NewSet(), + knownBlocks: mapset.NewSet(), + queuedBlocks: make(chan *propEvent, maxQueuedBlocks), + queuedBlockAnns: make(chan *types.Block, maxQueuedBlockAnns), + txPropagation: make(chan []common.Hash), + txAnnounce: make(chan []common.Hash), + txRetrieval: make(chan []common.Hash), + getPooledTx: getPooledTx, + term: make(chan struct{}), } } -// broadcast is a write loop that multiplexes block propagations, announcements -// and transaction broadcasts into the remote peer. The goal is to have an async -// writer that does not lock up node internals. -func (p *peer) broadcast() { +// broadcastBlocks is a write loop that multiplexes block propagations, +// announcements into the remote peer. The goal is to have an async writer +// that does not lock up node internals. +func (p *peer) broadcastBlocks() { for { select { - case txs := <-p.queuedTxs: - if err := p.SendTransactions(txs); err != nil { - return - } - p.Log().Trace("Broadcast transactions", "count", len(txs)) - - case prop := <-p.queuedProps: + case prop := <-p.queuedBlocks: if err := p.SendNewBlock(prop.block, prop.td); err != nil { return } p.Log().Trace("Propagated block", "number", prop.block.Number(), "hash", prop.block.Hash(), "td", prop.td) - case block := <-p.queuedAnns: + case block := <-p.queuedBlockAnns: if err := p.SendNewBlockHashes([]common.Hash{block.Hash()}, []uint64{block.NumberU64()}); err != nil { return } @@ -139,6 +155,175 @@ func (p *peer) broadcast() { } } +// broadcastTxs is a write loop that multiplexes transaction propagations, +// announcements into the remote peer. The goal is to have an async writer +// that does not lock up node internals. +func (p *peer) broadcastTxs() { + var ( + txProps []common.Hash // Queue of transaction propagations to the peer + txAnnos []common.Hash // Queue of transaction announcements to the peer + done chan struct{} // Non-nil if background network sender routine is active. + errch = make(chan error) // Channel used to receive network error + ) + scheduleTask := func() { + // Short circuit if there already has a inflight task. + if done != nil { + return + } + // Spin up transaction propagation task if there is any + // queued hashes. + if len(txProps) > 0 { + var ( + hashes []common.Hash + txs []*types.Transaction + size common.StorageSize + ) + for i := 0; i < len(txProps) && size < txsyncPackSize; i++ { + if tx := p.getPooledTx(txProps[i]); tx != nil { + txs = append(txs, tx) + size += tx.Size() + } + hashes = append(hashes, txProps[i]) + } + txProps = txProps[:copy(txProps, txProps[len(hashes):])] + if len(txs) > 0 { + done = make(chan struct{}) + go func() { + if err := p.SendNewTransactions(txs); err != nil { + errch <- err + return + } + close(done) + p.Log().Trace("Sent transactions", "count", len(txs)) + }() + return + } + } + // Spin up transaction announcement task if there is any + // queued hashes. + if len(txAnnos) > 0 { + var ( + hashes []common.Hash + pending []common.Hash + size common.StorageSize + ) + for i := 0; i < len(txAnnos) && size < txsyncPackSize; i++ { + if tx := p.getPooledTx(txAnnos[i]); tx != nil { + pending = append(pending, txAnnos[i]) + size += common.HashLength + } + hashes = append(hashes, txAnnos[i]) + } + txAnnos = txAnnos[:copy(txAnnos, txAnnos[len(hashes):])] + if len(pending) > 0 { + done = make(chan struct{}) + go func() { + if err := p.SendNewTransactionHashes(pending); err != nil { + errch <- err + return + } + close(done) + p.Log().Trace("Sent transaction announcements", "count", len(pending)) + }() + } + } + } + + for { + scheduleTask() + select { + case hashes := <-p.txPropagation: + if len(txProps) == maxQueuedTxs { + continue + } + if len(txProps)+len(hashes) > maxQueuedTxs { + hashes = hashes[:maxQueuedTxs-len(txProps)] + } + txProps = append(txProps, hashes...) + + case hashes := <-p.txAnnounce: + if len(txAnnos) == maxQueuedTxAnns { + continue + } + if len(txAnnos)+len(hashes) > maxQueuedTxAnns { + hashes = hashes[:maxQueuedTxAnns-len(txAnnos)] + } + txAnnos = append(txAnnos, hashes...) + + case <-done: + done = nil + + case <-errch: + return + + case <-p.term: + return + } + } +} + +// retrievalTxs is a write loop which is responsible for retrieving transaction +// from the remote peer. The goal is to have an async writer that does not lock +// up node internals. If there are too many requests queued, then new arrival +// requests will be dropped silently so that we can ensure the memory assumption +// is fixed for each peer. +func (p *peer) retrievalTxs() { + var ( + requests []common.Hash // Queue of transaction requests to the peer + done chan struct{} // Non-nil if background network sender routine is active. + errch = make(chan error) // Channel used to receive network error + ) + // pick chooses a reasonble number of transaction hashes for retrieval. + pick := func() []common.Hash { + var ret []common.Hash + if len(requests) > fetcher.MaxTransactionFetch { + ret = requests[:fetcher.MaxTransactionFetch] + } else { + ret = requests[:] + } + requests = requests[:copy(requests, requests[len(ret):])] + return ret + } + // send sends transactions retrieval request. + send := func(hashes []common.Hash, done chan struct{}) { + if err := p.RequestTxs(hashes); err != nil { + errch <- err + return + } + close(done) + p.Log().Trace("Sent transaction retrieval request", "count", len(hashes)) + } + for { + select { + case hashes := <-p.txRetrieval: + if len(requests) == maxQueuedTxRetrieval { + continue + } + if len(requests)+len(hashes) > maxQueuedTxRetrieval { + hashes = hashes[:maxQueuedTxRetrieval-len(requests)] + } + requests = append(requests, hashes...) + if done == nil { + done = make(chan struct{}) + go send(pick(), done) + } + + case <-done: + done = nil + if pending := pick(); len(pending) > 0 { + done = make(chan struct{}) + go send(pending, done) + } + + case <- errch: + return + + case <-p.term: + return + } + } +} + // close signals the broadcast goroutine to terminate. func (p *peer) close() { close(p.term) @@ -194,33 +379,67 @@ func (p *peer) MarkTransaction(hash common.Hash) { p.knownTxs.Add(hash) } -// SendTransactions sends transactions to the peer and includes the hashes -// in its transaction hash set for future reference. -func (p *peer) SendTransactions(txs types.Transactions) error { +// SendNewTransactionHashes sends a batch of transaction hashes to the peer and +// includes the hashes in its transaction hash set for future reference. +func (p *peer) SendNewTransactionHashes(hashes []common.Hash) error { // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) { + p.knownTxs.Pop() + } + for _, hash := range hashes { + p.knownTxs.Add(hash) + } + return p2p.Send(p.rw, NewPooledTransactionHashesMsg, hashes) +} + +// SendNewTransactions sends transactions to the peer and includes the hashes +// in its transaction hash set for future reference. +func (p *peer) SendNewTransactions(txs types.Transactions) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(txs)) { + p.knownTxs.Pop() + } for _, tx := range txs { p.knownTxs.Add(tx.Hash()) } - for p.knownTxs.Cardinality() >= maxKnownTxs { - p.knownTxs.Pop() - } + return p2p.Send(p.rw, TxMsg, txs) +} + +func (p *peer) SendTransactionRLP(txs []rlp.RawValue) error { return p2p.Send(p.rw, TxMsg, txs) } // AsyncSendTransactions queues list of transactions propagation to a remote // peer. If the peer's broadcast queue is full, the event is silently dropped. -func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { +func (p *peer) AsyncSendTransactions(hashes []common.Hash) { select { - case p.queuedTxs <- txs: + case p.txPropagation <- hashes: // Mark all the transactions as known, but ensure we don't overflow our limits - for _, tx := range txs { - p.knownTxs.Add(tx.Hash()) - } - for p.knownTxs.Cardinality() >= maxKnownTxs { + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) { p.knownTxs.Pop() } - default: - p.Log().Debug("Dropping transaction propagation", "count", len(txs)) + for _, hash := range hashes { + p.knownTxs.Add(hash) + } + case <-p.term: + p.Log().Debug("Dropping transaction propagation", "count", len(hashes)) + } +} + +// AsyncSendTransactions queues list of transactions propagation to a remote +// peer. If the peer's broadcast queue is full, the event is silently dropped. +func (p *peer) AsyncSendTransactionHashes(hashes []common.Hash) { + select { + case p.txAnnounce <- hashes: + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) { + p.knownTxs.Pop() + } + for _, hash := range hashes { + p.knownTxs.Add(hash) + } + case <-p.term: + p.Log().Debug("Dropping transaction announcement", "count", len(hashes)) } } @@ -228,12 +447,12 @@ func (p *peer) AsyncSendTransactions(txs []*types.Transaction) { // a hash notification. func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error { // Mark all the block hashes as known, but ensure we don't overflow our limits + for p.knownBlocks.Cardinality() > max(0, maxKnownBlocks-len(hashes)) { + p.knownBlocks.Pop() + } for _, hash := range hashes { p.knownBlocks.Add(hash) } - for p.knownBlocks.Cardinality() >= maxKnownBlocks { - p.knownBlocks.Pop() - } request := make(newBlockHashesData, len(hashes)) for i := 0; i < len(hashes); i++ { request[i].Hash = hashes[i] @@ -247,12 +466,12 @@ func (p *peer) SendNewBlockHashes(hashes []common.Hash, numbers []uint64) error // dropped. func (p *peer) AsyncSendNewBlockHash(block *types.Block) { select { - case p.queuedAnns <- block: + case p.queuedBlockAnns <- block: // Mark all the block hash as known, but ensure we don't overflow our limits - p.knownBlocks.Add(block.Hash()) for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop() } + p.knownBlocks.Add(block.Hash()) default: p.Log().Debug("Dropping block announcement", "number", block.NumberU64(), "hash", block.Hash()) } @@ -261,10 +480,10 @@ func (p *peer) AsyncSendNewBlockHash(block *types.Block) { // SendNewBlock propagates an entire block to a remote peer. func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { // Mark all the block hash as known, but ensure we don't overflow our limits - p.knownBlocks.Add(block.Hash()) for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop() } + p.knownBlocks.Add(block.Hash()) return p2p.Send(p.rw, NewBlockMsg, []interface{}{block, td}) } @@ -272,12 +491,12 @@ func (p *peer) SendNewBlock(block *types.Block, td *big.Int) error { // the peer's broadcast queue is full, the event is silently dropped. func (p *peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { select { - case p.queuedProps <- &propEvent{block: block, td: td}: + case p.queuedBlocks <- &propEvent{block: block, td: td}: // Mark all the block hash as known, but ensure we don't overflow our limits - p.knownBlocks.Add(block.Hash()) for p.knownBlocks.Cardinality() >= maxKnownBlocks { p.knownBlocks.Pop() } + p.knownBlocks.Add(block.Hash()) default: p.Log().Debug("Dropping block propagation", "number", block.NumberU64(), "hash", block.Hash()) } @@ -352,6 +571,22 @@ func (p *peer) RequestReceipts(hashes []common.Hash) error { return p2p.Send(p.rw, GetReceiptsMsg, hashes) } +// RequestTxs fetches a batch of transactions from a remote node. +func (p *peer) RequestTxs(hashes []common.Hash) error { + p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) + return p2p.Send(p.rw, GetPooledTransactionsMsg, hashes) +} + +// AsyncRequestTxs queues a tx retrieval request to a remote peer. If +// the peer's retrieval queue is full, the event is silently dropped. +func (p *peer) AsyncRequestTxs(hashes []common.Hash) { + select { + case p.txRetrieval <- hashes: + case <-p.term: + p.Log().Debug("Dropping transaction retrieval request", "count", len(hashes)) + } +} + // Handshake executes the eth protocol handshake, negotiating version number, // network IDs, difficulties, head and genesis blocks. func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis common.Hash, forkID forkid.ID, forkFilter forkid.Filter) error { @@ -372,7 +607,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis CurrentBlock: head, GenesisBlock: genesis, }) - case p.version == eth64: + case p.version >= eth64: errc <- p2p.Send(p.rw, StatusMsg, &statusData{ ProtocolVersion: uint32(p.version), NetworkID: network, @@ -389,7 +624,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis switch { case p.version == eth63: errc <- p.readStatusLegacy(network, &status63, genesis) - case p.version == eth64: + case p.version >= eth64: errc <- p.readStatus(network, &status, genesis, forkFilter) default: panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version)) @@ -410,7 +645,7 @@ func (p *peer) Handshake(network uint64, td *big.Int, head common.Hash, genesis switch { case p.version == eth63: p.td, p.head = status63.TD, status63.CurrentBlock - case p.version == eth64: + case p.version >= eth64: p.td, p.head = status.TD, status.Head default: panic(fmt.Sprintf("unsupported eth protocol version: %d", p.version)) @@ -511,7 +746,9 @@ func (ps *peerSet) Register(p *peer) error { return errAlreadyRegistered } ps.peers[p.id] = p - go p.broadcast() + go p.broadcastBlocks() + go p.broadcastTxs() + go p.retrievalTxs() return nil } diff --git a/eth/protocol.go b/eth/protocol.go index 62e4d13d1..1cef66adb 100644 --- a/eth/protocol.go +++ b/eth/protocol.go @@ -33,16 +33,17 @@ import ( const ( eth63 = 63 eth64 = 64 + eth65 = 65 ) // protocolName is the official short name of the protocol used during capability negotiation. const protocolName = "eth" // ProtocolVersions are the supported versions of the eth protocol (first is primary). -var ProtocolVersions = []uint{eth64, eth63} +var ProtocolVersions = []uint{eth65, eth64, eth63} // protocolLengths are the number of implemented message corresponding to different protocol versions. -var protocolLengths = map[uint]uint64{eth64: 17, eth63: 17} +var protocolLengths = map[uint]uint64{eth65: 17, eth64: 17, eth63: 17} const protocolMaxMsgSize = 10 * 1024 * 1024 // Maximum cap on the size of a protocol message @@ -60,6 +61,13 @@ const ( NodeDataMsg = 0x0e GetReceiptsMsg = 0x0f ReceiptsMsg = 0x10 + + // New protocol message codes introduced in eth65 + // + // Previously these message ids(0x08, 0x09) were used by some + // legacy and unsupported eth protocols, reown them here. + NewPooledTransactionHashesMsg = 0x08 + GetPooledTransactionsMsg = 0x09 ) type errCode int @@ -94,6 +102,14 @@ var errorToString = map[int]string{ } type txPool interface { + // Has returns an indicator whether txpool has a transaction + // cached with the given hash. + Has(hash common.Hash) bool + + // Get retrieves the transaction from local txpool with given + // tx hash. + Get(hash common.Hash) *types.Transaction + // AddRemotes should add the given transactions to the pool. AddRemotes([]*types.Transaction) []error diff --git a/eth/protocol_test.go b/eth/protocol_test.go index ca418942b..e9a1a511e 100644 --- a/eth/protocol_test.go +++ b/eth/protocol_test.go @@ -20,6 +20,7 @@ import ( "fmt" "math/big" "sync" + "sync/atomic" "testing" "time" @@ -180,16 +181,16 @@ func TestForkIDSplit(t *testing.T) { blocksNoFork, _ = core.GenerateChain(configNoFork, genesisNoFork, engine, dbNoFork, 2, nil) blocksProFork, _ = core.GenerateChain(configProFork, genesisProFork, engine, dbProFork, 2, nil) - ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainNoFork, dbNoFork, 1, nil) - ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), new(testTxPool), engine, chainProFork, dbProFork, 1, nil) + ethNoFork, _ = NewProtocolManager(configNoFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainNoFork, dbNoFork, 1, nil) + ethProFork, _ = NewProtocolManager(configProFork, nil, downloader.FullSync, 1, new(event.TypeMux), &testTxPool{pool: make(map[common.Hash]*types.Transaction)}, engine, chainProFork, dbProFork, 1, nil) ) ethNoFork.Start(1000) ethProFork.Start(1000) // Both nodes should allow the other to connect (same genesis, next fork is the same) p2pNoFork, p2pProFork := p2p.MsgPipe() - peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork) - peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork) + peerNoFork := newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) + peerProFork := newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) errc := make(chan error, 2) go func() { errc <- ethNoFork.handle(peerProFork) }() @@ -207,8 +208,8 @@ func TestForkIDSplit(t *testing.T) { chainProFork.InsertChain(blocksProFork[:1]) p2pNoFork, p2pProFork = p2p.MsgPipe() - peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork) - peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork) + peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) + peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) errc = make(chan error, 2) go func() { errc <- ethNoFork.handle(peerProFork) }() @@ -226,8 +227,8 @@ func TestForkIDSplit(t *testing.T) { chainProFork.InsertChain(blocksProFork[1:2]) p2pNoFork, p2pProFork = p2p.MsgPipe() - peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork) - peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork) + peerNoFork = newPeer(64, p2p.NewPeer(enode.ID{1}, "", nil), p2pNoFork, nil) + peerProFork = newPeer(64, p2p.NewPeer(enode.ID{2}, "", nil), p2pProFork, nil) errc = make(chan error, 2) go func() { errc <- ethNoFork.handle(peerProFork) }() @@ -246,6 +247,7 @@ func TestForkIDSplit(t *testing.T) { // This test checks that received transactions are added to the local pool. func TestRecvTransactions63(t *testing.T) { testRecvTransactions(t, 63) } func TestRecvTransactions64(t *testing.T) { testRecvTransactions(t, 64) } +func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, 65) } func testRecvTransactions(t *testing.T, protocol int) { txAdded := make(chan []*types.Transaction) @@ -274,6 +276,7 @@ func testRecvTransactions(t *testing.T, protocol int) { // This test checks that pending transactions are sent. func TestSendTransactions63(t *testing.T) { testSendTransactions(t, 63) } func TestSendTransactions64(t *testing.T) { testSendTransactions(t, 64) } +func TestSendTransactions65(t *testing.T) { testSendTransactions(t, 65) } func testSendTransactions(t *testing.T, protocol int) { pm, _ := newTestProtocolManagerMust(t, downloader.FullSync, 0, nil, nil) @@ -298,17 +301,43 @@ func testSendTransactions(t *testing.T, protocol int) { } for n := 0; n < len(alltxs) && !t.Failed(); { var txs []*types.Transaction - msg, err := p.app.ReadMsg() - if err != nil { - t.Errorf("%v: read error: %v", p.Peer, err) - } else if msg.Code != TxMsg { - t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + var hashes []common.Hash + var forAllHashes func(callback func(hash common.Hash)) + switch protocol { + case 63: + fallthrough + case 64: + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != TxMsg { + t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&txs); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + forAllHashes = func(callback func(hash common.Hash)) { + for _, tx := range txs { + callback(tx.Hash()) + } + } + case 65: + msg, err := p.app.ReadMsg() + if err != nil { + t.Errorf("%v: read error: %v", p.Peer, err) + } else if msg.Code != NewPooledTransactionHashesMsg { + t.Errorf("%v: got code %d, want TxMsg", p.Peer, msg.Code) + } + if err := msg.Decode(&hashes); err != nil { + t.Errorf("%v: %v", p.Peer, err) + } + forAllHashes = func(callback func(hash common.Hash)) { + for _, h := range hashes { + callback(h) + } + } } - if err := msg.Decode(&txs); err != nil { - t.Errorf("%v: %v", p.Peer, err) - } - for _, tx := range txs { - hash := tx.Hash() + forAllHashes(func(hash common.Hash) { seentx, want := seen[hash] if seentx { t.Errorf("%v: got tx more than once: %x", p.Peer, hash) @@ -318,7 +347,7 @@ func testSendTransactions(t *testing.T, protocol int) { } seen[hash] = true n++ - } + }) } } for i := 0; i < 3; i++ { @@ -329,6 +358,53 @@ func testSendTransactions(t *testing.T, protocol int) { wg.Wait() } +func TestTransactionPropagation(t *testing.T) { testSyncTransaction(t, true) } +func TestTransactionAnnouncement(t *testing.T) { testSyncTransaction(t, false) } + +func testSyncTransaction(t *testing.T, propagtion bool) { + // Create a protocol manager for transaction fetcher and sender + pmFetcher, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) + defer pmFetcher.Stop() + pmSender, _ := newTestProtocolManagerMust(t, downloader.FastSync, 1024, nil, nil) + pmSender.broadcastTxAnnouncesOnly = !propagtion + defer pmSender.Stop() + + // Sync up the two peers + io1, io2 := p2p.MsgPipe() + + go pmSender.handle(pmSender.newPeer(65, p2p.NewPeer(enode.ID{}, "sender", nil), io2, pmSender.txpool.Get)) + go pmFetcher.handle(pmFetcher.newPeer(65, p2p.NewPeer(enode.ID{}, "fetcher", nil), io1, pmFetcher.txpool.Get)) + + time.Sleep(250 * time.Millisecond) + pmFetcher.synchronise(pmFetcher.peers.BestPeer()) + atomic.StoreUint32(&pmFetcher.acceptTxs, 1) + + newTxs := make(chan core.NewTxsEvent, 1024) + sub := pmFetcher.txpool.SubscribeNewTxsEvent(newTxs) + defer sub.Unsubscribe() + + // Fill the pool with new transactions + alltxs := make([]*types.Transaction, 1024) + for nonce := range alltxs { + alltxs[nonce] = newTestTransaction(testAccount, uint64(nonce), 0) + } + pmSender.txpool.AddRemotes(alltxs) + + var got int +loop: + for { + select { + case ev := <-newTxs: + got += len(ev.Txs) + if got == 1024 { + break loop + } + case <-time.NewTimer(time.Second).C: + t.Fatal("Failed to retrieve all transaction") + } + } +} + // Tests that the custom union field encoder and decoder works correctly. func TestGetBlockHeadersDataEncodeDecode(t *testing.T) { // Create a "random" hash for testing diff --git a/eth/sync.go b/eth/sync.go index 9e180ee20..93b2dd2ec 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -38,8 +38,9 @@ const ( ) type txsync struct { - p *peer - txs []*types.Transaction + p *peer + hashes []common.Hash + txs []*types.Transaction } // syncTransactions starts sending all currently pending transactions to the given peer. @@ -53,7 +54,7 @@ func (pm *ProtocolManager) syncTransactions(p *peer) { return } select { - case pm.txsyncCh <- &txsync{p, txs}: + case pm.txsyncCh <- &txsync{p: p, txs: txs}: case <-pm.quitSync: } } @@ -69,26 +70,46 @@ func (pm *ProtocolManager) txsyncLoop() { pack = new(txsync) // the pack that is being sent done = make(chan error, 1) // result of the send ) - // send starts a sending a pack of transactions from the sync. send := func(s *txsync) { // Fill pack with transactions up to the target size. size := common.StorageSize(0) pack.p = s.p + pack.hashes = pack.hashes[:0] pack.txs = pack.txs[:0] - for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { - pack.txs = append(pack.txs, s.txs[i]) - size += s.txs[i].Size() + if s.p.version >= eth65 { + // Eth65 introduces transaction announcement https://github.com/ethereum/EIPs/pull/2464, + // only txhashes are transferred here. + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.hashes = append(pack.hashes, s.txs[i].Hash()) + size += common.HashLength + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.hashes):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transaction announcements", "count", len(pack.hashes), "bytes", size) + sending = true + go func() { done <- pack.p.SendNewTransactionHashes(pack.hashes) }() + } else { + // Legacy eth protocol doesn't have transaction announcement protocol + // message, transfer the whole pending transaction slice. + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) + sending = true + go func() { done <- pack.p.SendNewTransactions(pack.txs) }() } - // Remove the transactions that will be sent. - s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] - if len(s.txs) == 0 { - delete(pending, s.p.ID()) - } - // Send the pack in the background. - s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) - sending = true - go func() { done <- pack.p.SendTransactions(pack.txs) }() } // pick chooses the next pending sync. @@ -133,8 +154,10 @@ func (pm *ProtocolManager) txsyncLoop() { // downloading hashes and blocks as well as handling the announcement handler. func (pm *ProtocolManager) syncer() { // Start and ensure cleanup of sync mechanisms - pm.fetcher.Start() - defer pm.fetcher.Stop() + pm.blockFetcher.Start() + pm.txFetcher.Start() + defer pm.blockFetcher.Stop() + defer pm.txFetcher.Stop() defer pm.downloader.Terminate() // Wait for different events to fire synchronisation operations diff --git a/eth/sync_test.go b/eth/sync_test.go index e4c99ff58..d02bc5710 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -26,9 +26,13 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" ) +func TestFastSyncDisabling63(t *testing.T) { testFastSyncDisabling(t, 63) } +func TestFastSyncDisabling64(t *testing.T) { testFastSyncDisabling(t, 64) } +func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, 65) } + // Tests that fast sync gets disabled as soon as a real block is successfully // imported into the blockchain. -func TestFastSyncDisabling(t *testing.T) { +func testFastSyncDisabling(t *testing.T, protocol int) { // Create a pristine protocol manager, check that fast sync is left enabled pmEmpty, _ := newTestProtocolManagerMust(t, downloader.FastSync, 0, nil, nil) if atomic.LoadUint32(&pmEmpty.fastSync) == 0 { @@ -42,8 +46,8 @@ func TestFastSyncDisabling(t *testing.T) { // Sync up the two peers io1, io2 := p2p.MsgPipe() - go pmFull.handle(pmFull.newPeer(63, p2p.NewPeer(enode.ID{}, "empty", nil), io2)) - go pmEmpty.handle(pmEmpty.newPeer(63, p2p.NewPeer(enode.ID{}, "full", nil), io1)) + go pmFull.handle(pmFull.newPeer(protocol, p2p.NewPeer(enode.ID{}, "empty", nil), io2, pmFull.txpool.Get)) + go pmEmpty.handle(pmEmpty.newPeer(protocol, p2p.NewPeer(enode.ID{}, "full", nil), io1, pmEmpty.txpool.Get)) time.Sleep(250 * time.Millisecond) pmEmpty.synchronise(pmEmpty.peers.BestPeer())