From c368f728c19e7fd7a9613513edda68ffcb503af0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?P=C3=A9ter=20Szil=C3=A1gyi?= Date: Fri, 20 Aug 2021 15:14:21 +0300 Subject: [PATCH] Revert "eth: drop eth/65, the last non-reqid protocol version" (#23426) --- cmd/devp2p/internal/ethtest/suite_test.go | 2 +- eth/downloader/downloader.go | 4 +- eth/downloader/downloader_test.go | 95 ++++++++++- eth/downloader/peer.go | 8 +- eth/handler.go | 5 +- eth/handler_eth_test.go | 38 ++--- eth/protocols/eth/handler.go | 47 ++++-- eth/protocols/eth/handler_test.go | 134 +++++++++------ eth/protocols/eth/handlers.go | 107 ++++++++++++ eth/protocols/eth/handshake_test.go | 1 + eth/protocols/eth/peer.go | 194 +++++++++++++++------- eth/protocols/eth/protocol.go | 34 ++-- eth/sync.go | 102 +++++++++++- eth/sync_test.go | 1 + les/client_handler.go | 2 +- 15 files changed, 602 insertions(+), 172 deletions(-) diff --git a/cmd/devp2p/internal/ethtest/suite_test.go b/cmd/devp2p/internal/ethtest/suite_test.go index 6d14404e6..6e3217151 100644 --- a/cmd/devp2p/internal/ethtest/suite_test.go +++ b/cmd/devp2p/internal/ethtest/suite_test.go @@ -45,7 +45,7 @@ func TestEthSuite(t *testing.T) { if err != nil { t.Fatalf("could not create new test suite: %v", err) } - for _, test := range suite.Eth66Tests() { + for _, test := range suite.AllEthTests() { t.Run(test.Name, func(t *testing.T) { result := utesting.RunTAP([]utesting.Test{{Name: test.Name, Fn: test.Fn}}, os.Stdout) if result[0].Failed { diff --git a/eth/downloader/downloader.go b/eth/downloader/downloader.go index a6bf87acb..1767506a3 100644 --- a/eth/downloader/downloader.go +++ b/eth/downloader/downloader.go @@ -448,8 +448,8 @@ func (d *Downloader) syncWithPeer(p *peerConnection, hash common.Hash, td *big.I d.mux.Post(DoneEvent{latest}) } }() - if p.version < eth.ETH66 { - return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH66) + if p.version < eth.ETH65 { + return fmt.Errorf("%w: advertized %d < required %d", errTooOld, p.version, eth.ETH65) } mode := d.getMode() diff --git a/eth/downloader/downloader_test.go b/eth/downloader/downloader_test.go index 17cd3630c..794160993 100644 --- a/eth/downloader/downloader_test.go +++ b/eth/downloader/downloader_test.go @@ -522,6 +522,10 @@ func assertOwnForkedChain(t *testing.T, tester *downloadTester, common int, leng } } +func TestCanonicalSynchronisation65Full(t *testing.T) { testCanonSync(t, eth.ETH65, FullSync) } +func TestCanonicalSynchronisation65Fast(t *testing.T) { testCanonSync(t, eth.ETH65, FastSync) } +func TestCanonicalSynchronisation65Light(t *testing.T) { testCanonSync(t, eth.ETH65, LightSync) } + func TestCanonicalSynchronisation66Full(t *testing.T) { testCanonSync(t, eth.ETH66, FullSync) } func TestCanonicalSynchronisation66Fast(t *testing.T) { testCanonSync(t, eth.ETH66, FastSync) } func TestCanonicalSynchronisation66Light(t *testing.T) { testCanonSync(t, eth.ETH66, LightSync) } @@ -545,6 +549,9 @@ func testCanonSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that if a large batch of blocks are being downloaded, it is throttled // until the cached blocks are retrieved. +func TestThrottling65Full(t *testing.T) { testThrottling(t, eth.ETH65, FullSync) } +func TestThrottling65Fast(t *testing.T) { testThrottling(t, eth.ETH65, FastSync) } + func TestThrottling66Full(t *testing.T) { testThrottling(t, eth.ETH66, FullSync) } func TestThrottling66Fast(t *testing.T) { testThrottling(t, eth.ETH66, FastSync) } @@ -627,6 +634,10 @@ func testThrottling(t *testing.T, protocol uint, mode SyncMode) { // Tests that simple synchronization against a forked chain works correctly. In // this test common ancestor lookup should *not* be short circuited, and a full // binary search should be executed. +func TestForkedSync65Full(t *testing.T) { testForkedSync(t, eth.ETH65, FullSync) } +func TestForkedSync65Fast(t *testing.T) { testForkedSync(t, eth.ETH65, FastSync) } +func TestForkedSync65Light(t *testing.T) { testForkedSync(t, eth.ETH65, LightSync) } + func TestForkedSync66Full(t *testing.T) { testForkedSync(t, eth.ETH66, FullSync) } func TestForkedSync66Fast(t *testing.T) { testForkedSync(t, eth.ETH66, FastSync) } func TestForkedSync66Light(t *testing.T) { testForkedSync(t, eth.ETH66, LightSync) } @@ -656,6 +667,10 @@ func testForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that synchronising against a much shorter but much heavyer fork works // corrently and is not dropped. +func TestHeavyForkedSync65Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH65, FullSync) } +func TestHeavyForkedSync65Fast(t *testing.T) { testHeavyForkedSync(t, eth.ETH65, FastSync) } +func TestHeavyForkedSync65Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH65, LightSync) } + func TestHeavyForkedSync66Full(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, FullSync) } func TestHeavyForkedSync66Fast(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, FastSync) } func TestHeavyForkedSync66Light(t *testing.T) { testHeavyForkedSync(t, eth.ETH66, LightSync) } @@ -687,6 +702,10 @@ func testHeavyForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that chain forks are contained within a certain interval of the current // chain head, ensuring that malicious peers cannot waste resources by feeding // long dead chains. +func TestBoundedForkedSync65Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH65, FullSync) } +func TestBoundedForkedSync65Fast(t *testing.T) { testBoundedForkedSync(t, eth.ETH65, FastSync) } +func TestBoundedForkedSync65Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH65, LightSync) } + func TestBoundedForkedSync66Full(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, FullSync) } func TestBoundedForkedSync66Fast(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, FastSync) } func TestBoundedForkedSync66Light(t *testing.T) { testBoundedForkedSync(t, eth.ETH66, LightSync) } @@ -717,6 +736,16 @@ func testBoundedForkedSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that chain forks are contained within a certain interval of the current // chain head for short but heavy forks too. These are a bit special because they // take different ancestor lookup paths. +func TestBoundedHeavyForkedSync65Full(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH65, FullSync) +} +func TestBoundedHeavyForkedSync65Fast(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH65, FastSync) +} +func TestBoundedHeavyForkedSync65Light(t *testing.T) { + testBoundedHeavyForkedSync(t, eth.ETH65, LightSync) +} + func TestBoundedHeavyForkedSync66Full(t *testing.T) { testBoundedHeavyForkedSync(t, eth.ETH66, FullSync) } @@ -771,6 +800,10 @@ func TestInactiveDownloader63(t *testing.T) { } // Tests that a canceled download wipes all previously accumulated state. +func TestCancel65Full(t *testing.T) { testCancel(t, eth.ETH65, FullSync) } +func TestCancel65Fast(t *testing.T) { testCancel(t, eth.ETH65, FastSync) } +func TestCancel65Light(t *testing.T) { testCancel(t, eth.ETH65, LightSync) } + func TestCancel66Full(t *testing.T) { testCancel(t, eth.ETH66, FullSync) } func TestCancel66Fast(t *testing.T) { testCancel(t, eth.ETH66, FastSync) } func TestCancel66Light(t *testing.T) { testCancel(t, eth.ETH66, LightSync) } @@ -800,6 +833,10 @@ func testCancel(t *testing.T, protocol uint, mode SyncMode) { } // Tests that synchronisation from multiple peers works as intended (multi thread sanity test). +func TestMultiSynchronisation65Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH65, FullSync) } +func TestMultiSynchronisation65Fast(t *testing.T) { testMultiSynchronisation(t, eth.ETH65, FastSync) } +func TestMultiSynchronisation65Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH65, LightSync) } + func TestMultiSynchronisation66Full(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, FullSync) } func TestMultiSynchronisation66Fast(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, FastSync) } func TestMultiSynchronisation66Light(t *testing.T) { testMultiSynchronisation(t, eth.ETH66, LightSync) } @@ -826,6 +863,10 @@ func testMultiSynchronisation(t *testing.T, protocol uint, mode SyncMode) { // Tests that synchronisations behave well in multi-version protocol environments // and not wreak havoc on other nodes in the network. +func TestMultiProtoSynchronisation65Full(t *testing.T) { testMultiProtoSync(t, eth.ETH65, FullSync) } +func TestMultiProtoSynchronisation65Fast(t *testing.T) { testMultiProtoSync(t, eth.ETH65, FastSync) } +func TestMultiProtoSynchronisation65Light(t *testing.T) { testMultiProtoSync(t, eth.ETH65, LightSync) } + func TestMultiProtoSynchronisation66Full(t *testing.T) { testMultiProtoSync(t, eth.ETH66, FullSync) } func TestMultiProtoSynchronisation66Fast(t *testing.T) { testMultiProtoSync(t, eth.ETH66, FastSync) } func TestMultiProtoSynchronisation66Light(t *testing.T) { testMultiProtoSync(t, eth.ETH66, LightSync) } @@ -840,8 +881,8 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { chain := testChainBase.shorten(blockCacheMaxItems - 15) // Create peers of every type + tester.newPeer("peer 65", eth.ETH65, chain) tester.newPeer("peer 66", eth.ETH66, chain) - //tester.newPeer("peer 65", eth.ETH67, chain) // Synchronise with the requested peer and make sure all blocks were retrieved if err := tester.sync(fmt.Sprintf("peer %d", protocol), nil, mode); err != nil { @@ -850,7 +891,7 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { assertOwnChain(t, tester, chain.len()) // Check that no peers have been dropped off - for _, version := range []int{66} { + for _, version := range []int{65, 66} { peer := fmt.Sprintf("peer %d", version) if _, ok := tester.peers[peer]; !ok { t.Errorf("%s dropped", peer) @@ -860,6 +901,10 @@ func testMultiProtoSync(t *testing.T, protocol uint, mode SyncMode) { // Tests that if a block is empty (e.g. header only), no body request should be // made, and instead the header should be assembled into a whole block in itself. +func TestEmptyShortCircuit65Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH65, FullSync) } +func TestEmptyShortCircuit65Fast(t *testing.T) { testEmptyShortCircuit(t, eth.ETH65, FastSync) } +func TestEmptyShortCircuit65Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH65, LightSync) } + func TestEmptyShortCircuit66Full(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, FullSync) } func TestEmptyShortCircuit66Fast(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, FastSync) } func TestEmptyShortCircuit66Light(t *testing.T) { testEmptyShortCircuit(t, eth.ETH66, LightSync) } @@ -910,6 +955,10 @@ func testEmptyShortCircuit(t *testing.T, protocol uint, mode SyncMode) { // Tests that headers are enqueued continuously, preventing malicious nodes from // stalling the downloader by feeding gapped header chains. +func TestMissingHeaderAttack65Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH65, FullSync) } +func TestMissingHeaderAttack65Fast(t *testing.T) { testMissingHeaderAttack(t, eth.ETH65, FastSync) } +func TestMissingHeaderAttack65Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH65, LightSync) } + func TestMissingHeaderAttack66Full(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, FullSync) } func TestMissingHeaderAttack66Fast(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, FastSync) } func TestMissingHeaderAttack66Light(t *testing.T) { testMissingHeaderAttack(t, eth.ETH66, LightSync) } @@ -938,6 +987,10 @@ func testMissingHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that if requested headers are shifted (i.e. first is missing), the queue // detects the invalid numbering. +func TestShiftedHeaderAttack65Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH65, FullSync) } +func TestShiftedHeaderAttack65Fast(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH65, FastSync) } +func TestShiftedHeaderAttack65Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH65, LightSync) } + func TestShiftedHeaderAttack66Full(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, FullSync) } func TestShiftedHeaderAttack66Fast(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, FastSync) } func TestShiftedHeaderAttack66Light(t *testing.T) { testShiftedHeaderAttack(t, eth.ETH66, LightSync) } @@ -971,6 +1024,7 @@ func testShiftedHeaderAttack(t *testing.T, protocol uint, mode SyncMode) { // Tests that upon detecting an invalid header, the recent ones are rolled back // for various failure scenarios. Afterwards a full sync is attempted to make // sure no state was corrupted. +func TestInvalidHeaderRollback65Fast(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH65, FastSync) } func TestInvalidHeaderRollback66Fast(t *testing.T) { testInvalidHeaderRollback(t, eth.ETH66, FastSync) } func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) { @@ -1061,6 +1115,16 @@ func testInvalidHeaderRollback(t *testing.T, protocol uint, mode SyncMode) { // Tests that a peer advertising a high TD doesn't get to stall the downloader // afterwards by not sending any useful hashes. +func TestHighTDStarvationAttack65Full(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH65, FullSync) +} +func TestHighTDStarvationAttack65Fast(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH65, FastSync) +} +func TestHighTDStarvationAttack65Light(t *testing.T) { + testHighTDStarvationAttack(t, eth.ETH65, LightSync) +} + func TestHighTDStarvationAttack66Full(t *testing.T) { testHighTDStarvationAttack(t, eth.ETH66, FullSync) } @@ -1085,6 +1149,7 @@ func testHighTDStarvationAttack(t *testing.T, protocol uint, mode SyncMode) { } // Tests that misbehaving peers are disconnected, whilst behaving ones are not. +func TestBlockHeaderAttackerDropping65(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH65) } func TestBlockHeaderAttackerDropping66(t *testing.T) { testBlockHeaderAttackerDropping(t, eth.ETH66) } func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { @@ -1137,6 +1202,10 @@ func testBlockHeaderAttackerDropping(t *testing.T, protocol uint) { // Tests that synchronisation progress (origin block number, current block number // and highest block number) is tracked and updated correctly. +func TestSyncProgress65Full(t *testing.T) { testSyncProgress(t, eth.ETH65, FullSync) } +func TestSyncProgress65Fast(t *testing.T) { testSyncProgress(t, eth.ETH65, FastSync) } +func TestSyncProgress65Light(t *testing.T) { testSyncProgress(t, eth.ETH65, LightSync) } + func TestSyncProgress66Full(t *testing.T) { testSyncProgress(t, eth.ETH66, FullSync) } func TestSyncProgress66Fast(t *testing.T) { testSyncProgress(t, eth.ETH66, FastSync) } func TestSyncProgress66Light(t *testing.T) { testSyncProgress(t, eth.ETH66, LightSync) } @@ -1217,6 +1286,10 @@ func checkProgress(t *testing.T, d *Downloader, stage string, want ethereum.Sync // Tests that synchronisation progress (origin block number and highest block // number) is tracked and updated correctly in case of a fork (or manual head // revertal). +func TestForkedSyncProgress65Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH65, FullSync) } +func TestForkedSyncProgress65Fast(t *testing.T) { testForkedSyncProgress(t, eth.ETH65, FastSync) } +func TestForkedSyncProgress65Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH65, LightSync) } + func TestForkedSyncProgress66Full(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, FullSync) } func TestForkedSyncProgress66Fast(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, FastSync) } func TestForkedSyncProgress66Light(t *testing.T) { testForkedSyncProgress(t, eth.ETH66, LightSync) } @@ -1289,6 +1362,10 @@ func testForkedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // Tests that if synchronisation is aborted due to some failure, then the progress // origin is not updated in the next sync cycle, as it should be considered the // continuation of the previous sync and not a new instance. +func TestFailedSyncProgress65Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH65, FullSync) } +func TestFailedSyncProgress65Fast(t *testing.T) { testFailedSyncProgress(t, eth.ETH65, FastSync) } +func TestFailedSyncProgress65Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH65, LightSync) } + func TestFailedSyncProgress66Full(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, FullSync) } func TestFailedSyncProgress66Fast(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, FastSync) } func TestFailedSyncProgress66Light(t *testing.T) { testFailedSyncProgress(t, eth.ETH66, LightSync) } @@ -1358,6 +1435,10 @@ func testFailedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // Tests that if an attacker fakes a chain height, after the attack is detected, // the progress height is successfully reduced at the next sync invocation. +func TestFakedSyncProgress65Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH65, FullSync) } +func TestFakedSyncProgress65Fast(t *testing.T) { testFakedSyncProgress(t, eth.ETH65, FastSync) } +func TestFakedSyncProgress65Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH65, LightSync) } + func TestFakedSyncProgress66Full(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, FullSync) } func TestFakedSyncProgress66Fast(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, FastSync) } func TestFakedSyncProgress66Light(t *testing.T) { testFakedSyncProgress(t, eth.ETH66, LightSync) } @@ -1431,6 +1512,10 @@ func testFakedSyncProgress(t *testing.T, protocol uint, mode SyncMode) { // This test reproduces an issue where unexpected deliveries would // block indefinitely if they arrived at the right time. +func TestDeliverHeadersHang65Full(t *testing.T) { testDeliverHeadersHang(t, eth.ETH65, FullSync) } +func TestDeliverHeadersHang65Fast(t *testing.T) { testDeliverHeadersHang(t, eth.ETH65, FastSync) } +func TestDeliverHeadersHang65Light(t *testing.T) { testDeliverHeadersHang(t, eth.ETH65, LightSync) } + func TestDeliverHeadersHang66Full(t *testing.T) { testDeliverHeadersHang(t, eth.ETH66, FullSync) } func TestDeliverHeadersHang66Fast(t *testing.T) { testDeliverHeadersHang(t, eth.ETH66, FastSync) } func TestDeliverHeadersHang66Light(t *testing.T) { testDeliverHeadersHang(t, eth.ETH66, LightSync) } @@ -1588,6 +1673,12 @@ func TestRemoteHeaderRequestSpan(t *testing.T) { // Tests that peers below a pre-configured checkpoint block are prevented from // being fast-synced from, avoiding potential cheap eclipse attacks. +func TestCheckpointEnforcement65Full(t *testing.T) { testCheckpointEnforcement(t, eth.ETH65, FullSync) } +func TestCheckpointEnforcement65Fast(t *testing.T) { testCheckpointEnforcement(t, eth.ETH65, FastSync) } +func TestCheckpointEnforcement65Light(t *testing.T) { + testCheckpointEnforcement(t, eth.ETH65, LightSync) +} + func TestCheckpointEnforcement66Full(t *testing.T) { testCheckpointEnforcement(t, eth.ETH66, FullSync) } func TestCheckpointEnforcement66Fast(t *testing.T) { testCheckpointEnforcement(t, eth.ETH66, FastSync) } func TestCheckpointEnforcement66Light(t *testing.T) { diff --git a/eth/downloader/peer.go b/eth/downloader/peer.go index 863294832..066a36631 100644 --- a/eth/downloader/peer.go +++ b/eth/downloader/peer.go @@ -413,7 +413,7 @@ func (ps *peerSet) HeaderIdlePeers() ([]*peerConnection, int) { throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.BlockHeadersMsg, time.Second) } - return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } // BodyIdlePeers retrieves a flat list of all the currently body-idle peers within @@ -425,7 +425,7 @@ func (ps *peerSet) BodyIdlePeers() ([]*peerConnection, int) { throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.BlockBodiesMsg, time.Second) } - return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } // ReceiptIdlePeers retrieves a flat list of all the currently receipt-idle peers @@ -437,7 +437,7 @@ func (ps *peerSet) ReceiptIdlePeers() ([]*peerConnection, int) { throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.ReceiptsMsg, time.Second) } - return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } // NodeDataIdlePeers retrieves a flat list of all the currently node-data-idle @@ -449,7 +449,7 @@ func (ps *peerSet) NodeDataIdlePeers() ([]*peerConnection, int) { throughput := func(p *peerConnection) int { return p.rates.Capacity(eth.NodeDataMsg, time.Second) } - return ps.idlePeers(eth.ETH66, eth.ETH66, idle, throughput) + return ps.idlePeers(eth.ETH65, eth.ETH66, idle, throughput) } // idlePeers retrieves a flat list of all currently idle peers satisfying the diff --git a/eth/handler.go b/eth/handler.go index 06a8088bf..aff4871af 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -117,6 +117,7 @@ type handler struct { whitelist map[uint64]common.Hash // channels for fetcher, syncer, txsyncLoop + txsyncCh chan *txsync quitSync chan struct{} chainSync *chainSyncer @@ -139,6 +140,7 @@ func newHandler(config *handlerConfig) (*handler, error) { chain: config.Chain, peers: newPeerSet(), whitelist: config.Whitelist, + txsyncCh: make(chan *txsync), quitSync: make(chan struct{}), } if config.Sync == downloader.FullSync { @@ -406,8 +408,9 @@ func (h *handler) Start(maxPeers int) { go h.minedBroadcastLoop() // start sync handlers - h.wg.Add(1) + h.wg.Add(2) go h.chainSync.loop() + go h.txsyncLoop64() // TODO(karalabe): Legacy initial tx echange, drop with eth/64. } func (h *handler) Stop() { diff --git a/eth/handler_eth_test.go b/eth/handler_eth_test.go index 039091244..038de4699 100644 --- a/eth/handler_eth_test.go +++ b/eth/handler_eth_test.go @@ -80,6 +80,7 @@ func (h *testEthHandler) Handle(peer *eth.Peer, packet eth.Packet) error { // Tests that peers are correctly accepted (or rejected) based on the advertised // fork IDs in the protocol handshake. +func TestForkIDSplit65(t *testing.T) { testForkIDSplit(t, eth.ETH65) } func TestForkIDSplit66(t *testing.T) { testForkIDSplit(t, eth.ETH66) } func testForkIDSplit(t *testing.T, protocol uint) { @@ -235,6 +236,7 @@ func testForkIDSplit(t *testing.T, protocol uint) { } // Tests that received transactions are added to the local pool. +func TestRecvTransactions65(t *testing.T) { testRecvTransactions(t, eth.ETH65) } func TestRecvTransactions66(t *testing.T) { testRecvTransactions(t, eth.ETH66) } func testRecvTransactions(t *testing.T, protocol uint) { @@ -292,6 +294,7 @@ func testRecvTransactions(t *testing.T, protocol uint) { } // This test checks that pending transactions are sent. +func TestSendTransactions65(t *testing.T) { testSendTransactions(t, eth.ETH65) } func TestSendTransactions66(t *testing.T) { testSendTransactions(t, eth.ETH66) } func testSendTransactions(t *testing.T, protocol uint) { @@ -303,7 +306,7 @@ func testSendTransactions(t *testing.T, protocol uint) { insert := make([]*types.Transaction, 100) for nonce := range insert { - tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, 10240)) + tx := types.NewTransaction(uint64(nonce), common.Address{}, big.NewInt(0), 100000, big.NewInt(0), make([]byte, txsyncPackSize/10)) tx, _ = types.SignTx(tx, types.HomesteadSigner{}, testKey) insert[nonce] = tx @@ -377,6 +380,7 @@ func testSendTransactions(t *testing.T, protocol uint) { // Tests that transactions get propagated to all attached peers, either via direct // broadcasts or via announcements/retrievals. +func TestTransactionPropagation65(t *testing.T) { testTransactionPropagation(t, eth.ETH65) } func TestTransactionPropagation66(t *testing.T) { testTransactionPropagation(t, eth.ETH66) } func testTransactionPropagation(t *testing.T, protocol uint) { @@ -517,8 +521,8 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo defer p2pLocal.Close() defer p2pRemote.Close() - local := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool) - remote := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool) + local := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{1}, "", nil, p2pLocal), p2pLocal, handler.txpool) + remote := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{2}, "", nil, p2pRemote), p2pRemote, handler.txpool) defer local.Close() defer remote.Close() @@ -539,39 +543,30 @@ func testCheckpointChallenge(t *testing.T, syncmode downloader.SyncMode, checkpo if err := remote.Handshake(1, td, head.Hash(), genesis.Hash(), forkid.NewIDWithChain(handler.chain), forkid.NewFilter(handler.chain)); err != nil { t.Fatalf("failed to run protocol handshake") } + // Connect a new peer and check that we receive the checkpoint challenge. if checkpoint { - msg, err := p2pRemote.ReadMsg() - if err != nil { - t.Fatalf("failed to read checkpoint challenge: %v", err) - } - request := new(eth.GetBlockHeadersPacket66) - if err := msg.Decode(request); err != nil { - t.Fatalf("failed to decode checkpoint challenge: %v", err) - } - query := request.GetBlockHeadersPacket - if query.Origin.Number != response.Number.Uint64() || query.Amount != 1 || query.Skip != 0 || query.Reverse { - t.Fatalf("challenge mismatch: have [%d, %d, %d, %v] want [%d, %d, %d, %v]", - query.Origin.Number, query.Amount, query.Skip, query.Reverse, - response.Number.Uint64(), 1, 0, false) + if err := remote.ExpectRequestHeadersByNumber(response.Number.Uint64(), 1, 0, false); err != nil { + t.Fatalf("challenge mismatch: %v", err) } // Create a block to reply to the challenge if no timeout is simulated. if !timeout { if empty { - if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{}); err != nil { + if err := remote.SendBlockHeaders([]*types.Header{}); err != nil { t.Fatalf("failed to answer challenge: %v", err) } } else if match { - if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{response}); err != nil { + if err := remote.SendBlockHeaders([]*types.Header{response}); err != nil { t.Fatalf("failed to answer challenge: %v", err) } } else { - if err := remote.ReplyBlockHeaders(request.RequestId, []*types.Header{{Number: response.Number}}); err != nil { + if err := remote.SendBlockHeaders([]*types.Header{{Number: response.Number}}); err != nil { t.Fatalf("failed to answer challenge: %v", err) } } } } + // Wait until the test timeout passes to ensure proper cleanup time.Sleep(syncChallengeTimeout + 300*time.Millisecond) @@ -624,8 +619,8 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { defer sourcePipe.Close() defer sinkPipe.Close() - sourcePeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) - sinkPeer := eth.NewPeer(eth.ETH66, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) + sourcePeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{byte(i)}, "", nil, sourcePipe), sourcePipe, nil) + sinkPeer := eth.NewPeer(eth.ETH65, p2p.NewPeerPipe(enode.ID{0}, "", nil, sinkPipe), sinkPipe, nil) defer sourcePeer.Close() defer sinkPeer.Close() @@ -676,6 +671,7 @@ func testBroadcastBlock(t *testing.T, peers, bcasts int) { // Tests that a propagated malformed block (uncles or transactions don't match // with the hashes in the header) gets discarded and not broadcast forward. +func TestBroadcastMalformedBlock65(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH65) } func TestBroadcastMalformedBlock66(t *testing.T) { testBroadcastMalformedBlock(t, eth.ETH66) } func testBroadcastMalformedBlock(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/handler.go b/eth/protocols/eth/handler.go index 828930014..6bbaa2f55 100644 --- a/eth/protocols/eth/handler.go +++ b/eth/protocols/eth/handler.go @@ -171,21 +171,39 @@ type Decoder interface { Time() time.Time } +var eth65 = map[uint64]msgHandler{ + GetBlockHeadersMsg: handleGetBlockHeaders, + BlockHeadersMsg: handleBlockHeaders, + GetBlockBodiesMsg: handleGetBlockBodies, + BlockBodiesMsg: handleBlockBodies, + GetNodeDataMsg: handleGetNodeData, + NodeDataMsg: handleNodeData, + GetReceiptsMsg: handleGetReceipts, + ReceiptsMsg: handleReceipts, + NewBlockHashesMsg: handleNewBlockhashes, + NewBlockMsg: handleNewBlock, + TransactionsMsg: handleTransactions, + NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, + GetPooledTransactionsMsg: handleGetPooledTransactions, + PooledTransactionsMsg: handlePooledTransactions, +} + var eth66 = map[uint64]msgHandler{ NewBlockHashesMsg: handleNewBlockhashes, NewBlockMsg: handleNewBlock, TransactionsMsg: handleTransactions, NewPooledTransactionHashesMsg: handleNewPooledTransactionHashes, - GetBlockHeadersMsg: handleGetBlockHeaders66, - BlockHeadersMsg: handleBlockHeaders66, - GetBlockBodiesMsg: handleGetBlockBodies66, - BlockBodiesMsg: handleBlockBodies66, - GetNodeDataMsg: handleGetNodeData66, - NodeDataMsg: handleNodeData66, - GetReceiptsMsg: handleGetReceipts66, - ReceiptsMsg: handleReceipts66, - GetPooledTransactionsMsg: handleGetPooledTransactions66, - PooledTransactionsMsg: handlePooledTransactions66, + // eth66 messages with request-id + GetBlockHeadersMsg: handleGetBlockHeaders66, + BlockHeadersMsg: handleBlockHeaders66, + GetBlockBodiesMsg: handleGetBlockBodies66, + BlockBodiesMsg: handleBlockBodies66, + GetNodeDataMsg: handleGetNodeData66, + NodeDataMsg: handleNodeData66, + GetReceiptsMsg: handleGetReceipts66, + ReceiptsMsg: handleReceipts66, + GetPooledTransactionsMsg: handleGetPooledTransactions66, + PooledTransactionsMsg: handlePooledTransactions66, } // handleMessage is invoked whenever an inbound message is received from a remote @@ -201,11 +219,10 @@ func handleMessage(backend Backend, peer *Peer) error { } defer msg.Discard() - var handlers = eth66 - //if peer.Version() >= ETH67 { // Left in as a sample when new protocol is added - // handlers = eth67 - //} - + var handlers = eth65 + if peer.Version() >= ETH66 { + handlers = eth66 + } // Track the amount of time it takes to serve the request and run the handler if metrics.Enabled { h := fmt.Sprintf("%s/%s/%d/%#02x", p2p.HandleHistName, ProtocolName, peer.Version(), msg.Code) diff --git a/eth/protocols/eth/handler_test.go b/eth/protocols/eth/handler_test.go index 809f17e36..473be3f9b 100644 --- a/eth/protocols/eth/handler_test.go +++ b/eth/protocols/eth/handler_test.go @@ -110,6 +110,7 @@ func (b *testBackend) Handle(*Peer, Packet) error { } // Tests that block headers can be retrieved from a remote chain based on user queries. +func TestGetBlockHeaders65(t *testing.T) { testGetBlockHeaders(t, ETH65) } func TestGetBlockHeaders66(t *testing.T) { testGetBlockHeaders(t, ETH66) } func testGetBlockHeaders(t *testing.T, protocol uint) { @@ -253,30 +254,44 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { headers = append(headers, backend.chain.GetBlockByHash(hash).Header()) } // Send the hash request and verify the response - p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ - RequestId: 123, - GetBlockHeadersPacket: tt.query, - }) - if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{ - RequestId: 123, - BlockHeadersPacket: headers, - }); err != nil { - t.Errorf("test %d: headers mismatch: %v", i, err) + if protocol <= ETH65 { + p2p.Send(peer.app, GetBlockHeadersMsg, tt.query) + if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + } else { + p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ + RequestId: 123, + GetBlockHeadersPacket: tt.query, + }) + if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{ + RequestId: 123, + BlockHeadersPacket: headers, + }); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } } // If the test used number origins, repeat with hashes as the too if tt.query.Origin.Hash == (common.Hash{}) { if origin := backend.chain.GetBlockByNumber(tt.query.Origin.Number); origin != nil { tt.query.Origin.Hash, tt.query.Origin.Number = origin.Hash(), 0 - p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ - RequestId: 456, - GetBlockHeadersPacket: tt.query, - }) - if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{ - RequestId: 456, - BlockHeadersPacket: headers, - }); err != nil { - t.Errorf("test %d: headers mismatch: %v", i, err) + if protocol <= ETH65 { + p2p.Send(peer.app, GetBlockHeadersMsg, tt.query) + if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, headers); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } + } else { + p2p.Send(peer.app, GetBlockHeadersMsg, GetBlockHeadersPacket66{ + RequestId: 456, + GetBlockHeadersPacket: tt.query, + }) + if err := p2p.ExpectMsg(peer.app, BlockHeadersMsg, BlockHeadersPacket66{ + RequestId: 456, + BlockHeadersPacket: headers, + }); err != nil { + t.Errorf("test %d: headers mismatch: %v", i, err) + } } } } @@ -284,6 +299,7 @@ func testGetBlockHeaders(t *testing.T, protocol uint) { } // Tests that block contents can be retrieved from a remote chain based on their hashes. +func TestGetBlockBodies65(t *testing.T) { testGetBlockBodies(t, ETH65) } func TestGetBlockBodies66(t *testing.T) { testGetBlockBodies(t, ETH66) } func testGetBlockBodies(t *testing.T, protocol uint) { @@ -353,20 +369,28 @@ func testGetBlockBodies(t *testing.T, protocol uint) { } } // Send the hash request and verify the response - p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{ - RequestId: 123, - GetBlockBodiesPacket: hashes, - }) - if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, BlockBodiesPacket66{ - RequestId: 123, - BlockBodiesPacket: bodies, - }); err != nil { - t.Errorf("test %d: bodies mismatch: %v", i, err) + if protocol <= ETH65 { + p2p.Send(peer.app, GetBlockBodiesMsg, hashes) + if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, bodies); err != nil { + t.Errorf("test %d: bodies mismatch: %v", i, err) + } + } else { + p2p.Send(peer.app, GetBlockBodiesMsg, GetBlockBodiesPacket66{ + RequestId: 123, + GetBlockBodiesPacket: hashes, + }) + if err := p2p.ExpectMsg(peer.app, BlockBodiesMsg, BlockBodiesPacket66{ + RequestId: 123, + BlockBodiesPacket: bodies, + }); err != nil { + t.Errorf("test %d: bodies mismatch: %v", i, err) + } } } } // Tests that the state trie nodes can be retrieved based on hashes. +func TestGetNodeData65(t *testing.T) { testGetNodeData(t, ETH65) } func TestGetNodeData66(t *testing.T) { testGetNodeData(t, ETH66) } func testGetNodeData(t *testing.T, protocol uint) { @@ -425,10 +449,14 @@ func testGetNodeData(t *testing.T, protocol uint) { } it.Release() - p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{ - RequestId: 123, - GetNodeDataPacket: hashes, - }) + if protocol <= ETH65 { + p2p.Send(peer.app, GetNodeDataMsg, hashes) + } else { + p2p.Send(peer.app, GetNodeDataMsg, GetNodeDataPacket66{ + RequestId: 123, + GetNodeDataPacket: hashes, + }) + } msg, err := peer.app.ReadMsg() if err != nil { t.Fatalf("failed to read node data response: %v", err) @@ -436,14 +464,18 @@ func testGetNodeData(t *testing.T, protocol uint) { if msg.Code != NodeDataMsg { t.Fatalf("response packet code mismatch: have %x, want %x", msg.Code, NodeDataMsg) } - var ( - data [][]byte - res NodeDataPacket66 - ) - if err := msg.Decode(&res); err != nil { - t.Fatalf("failed to decode response node data: %v", err) + var data [][]byte + if protocol <= ETH65 { + if err := msg.Decode(&data); err != nil { + t.Fatalf("failed to decode response node data: %v", err) + } + } else { + var res NodeDataPacket66 + if err := msg.Decode(&res); err != nil { + t.Fatalf("failed to decode response node data: %v", err) + } + data = res.NodeDataPacket } - data = res.NodeDataPacket // Verify that all hashes correspond to the requested data, and reconstruct a state tree for i, want := range hashes { if hash := crypto.Keccak256Hash(data[i]); hash != want { @@ -474,6 +506,7 @@ func testGetNodeData(t *testing.T, protocol uint) { } // Tests that the transaction receipts can be retrieved based on hashes. +func TestGetBlockReceipts65(t *testing.T) { testGetBlockReceipts(t, ETH65) } func TestGetBlockReceipts66(t *testing.T) { testGetBlockReceipts(t, ETH66) } func testGetBlockReceipts(t *testing.T, protocol uint) { @@ -533,14 +566,21 @@ func testGetBlockReceipts(t *testing.T, protocol uint) { receipts = append(receipts, backend.chain.GetReceiptsByHash(block.Hash())) } // Send the hash request and verify the response - p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{ - RequestId: 123, - GetReceiptsPacket: hashes, - }) - if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, ReceiptsPacket66{ - RequestId: 123, - ReceiptsPacket: receipts, - }); err != nil { - t.Errorf("receipts mismatch: %v", err) + if protocol <= ETH65 { + p2p.Send(peer.app, GetReceiptsMsg, hashes) + if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, receipts); err != nil { + t.Errorf("receipts mismatch: %v", err) + } + } else { + p2p.Send(peer.app, GetReceiptsMsg, GetReceiptsPacket66{ + RequestId: 123, + GetReceiptsPacket: hashes, + }) + if err := p2p.ExpectMsg(peer.app, ReceiptsMsg, ReceiptsPacket66{ + RequestId: 123, + ReceiptsPacket: receipts, + }); err != nil { + t.Errorf("receipts mismatch: %v", err) + } } } diff --git a/eth/protocols/eth/handlers.go b/eth/protocols/eth/handlers.go index e54838cbc..d7d993a23 100644 --- a/eth/protocols/eth/handlers.go +++ b/eth/protocols/eth/handlers.go @@ -27,6 +27,17 @@ import ( "github.com/ethereum/go-ethereum/trie" ) +// handleGetBlockHeaders handles Block header query, collect the requested headers and reply +func handleGetBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { + // Decode the complex header query + var query GetBlockHeadersPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockHeadersQuery(backend, &query, peer) + return peer.SendBlockHeaders(response) +} + // handleGetBlockHeaders66 is the eth/66 version of handleGetBlockHeaders func handleGetBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { // Decode the complex header query @@ -124,6 +135,16 @@ func answerGetBlockHeadersQuery(backend Backend, query *GetBlockHeadersPacket, p return headers } +func handleGetBlockBodies(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block body retrieval message + var query GetBlockBodiesPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetBlockBodiesQuery(backend, query, peer) + return peer.SendBlockBodiesRLP(response) +} + func handleGetBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { // Decode the block body retrieval message var query GetBlockBodiesPacket66 @@ -153,6 +174,16 @@ func answerGetBlockBodiesQuery(backend Backend, query GetBlockBodiesPacket, peer return bodies } +func handleGetNodeData(backend Backend, msg Decoder, peer *Peer) error { + // Decode the trie node data retrieval message + var query GetNodeDataPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetNodeDataQuery(backend, query, peer) + return peer.SendNodeData(response) +} + func handleGetNodeData66(backend Backend, msg Decoder, peer *Peer) error { // Decode the trie node data retrieval message var query GetNodeDataPacket66 @@ -192,6 +223,16 @@ func answerGetNodeDataQuery(backend Backend, query GetNodeDataPacket, peer *Peer return nodes } +func handleGetReceipts(backend Backend, msg Decoder, peer *Peer) error { + // Decode the block receipts retrieval message + var query GetReceiptsPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + response := answerGetReceiptsQuery(backend, query, peer) + return peer.SendReceiptsRLP(response) +} + func handleGetReceipts66(backend Backend, msg Decoder, peer *Peer) error { // Decode the block receipts retrieval message var query GetReceiptsPacket66 @@ -271,6 +312,15 @@ func handleNewBlock(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, ann) } +func handleBlockHeaders(backend Backend, msg Decoder, peer *Peer) error { + // A batch of headers arrived to one of our previous requests + res := new(BlockHeadersPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { // A batch of headers arrived to one of our previous requests res := new(BlockHeadersPacket66) @@ -282,6 +332,15 @@ func handleBlockHeaders66(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, &res.BlockHeadersPacket) } +func handleBlockBodies(backend Backend, msg Decoder, peer *Peer) error { + // A batch of block bodies arrived to one of our previous requests + res := new(BlockBodiesPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { // A batch of block bodies arrived to one of our previous requests res := new(BlockBodiesPacket66) @@ -293,6 +352,15 @@ func handleBlockBodies66(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, &res.BlockBodiesPacket) } +func handleNodeData(backend Backend, msg Decoder, peer *Peer) error { + // A batch of node state data arrived to one of our previous requests + res := new(NodeDataPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { // A batch of node state data arrived to one of our previous requests res := new(NodeDataPacket66) @@ -304,6 +372,15 @@ func handleNodeData66(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, &res.NodeDataPacket) } +func handleReceipts(backend Backend, msg Decoder, peer *Peer) error { + // A batch of receipts arrived to one of our previous requests + res := new(ReceiptsPacket) + if err := msg.Decode(res); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + return backend.Handle(peer, res) +} + func handleReceipts66(backend Backend, msg Decoder, peer *Peer) error { // A batch of receipts arrived to one of our previous requests res := new(ReceiptsPacket66) @@ -332,6 +409,16 @@ func handleNewPooledTransactionHashes(backend Backend, msg Decoder, peer *Peer) return backend.Handle(peer, ann) } +func handleGetPooledTransactions(backend Backend, msg Decoder, peer *Peer) error { + // Decode the pooled transactions retrieval message + var query GetPooledTransactionsPacket + if err := msg.Decode(&query); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + hashes, txs := answerGetPooledTransactions(backend, query, peer) + return peer.SendPooledTransactionsRLP(hashes, txs) +} + func handleGetPooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { // Decode the pooled transactions retrieval message var query GetPooledTransactionsPacket66 @@ -390,6 +477,26 @@ func handleTransactions(backend Backend, msg Decoder, peer *Peer) error { return backend.Handle(peer, &txs) } +func handlePooledTransactions(backend Backend, msg Decoder, peer *Peer) error { + // Transactions arrived, make sure we have a valid and fresh chain to handle them + if !backend.AcceptTxs() { + return nil + } + // Transactions can be processed, parse all of them and deliver to the pool + var txs PooledTransactionsPacket + if err := msg.Decode(&txs); err != nil { + return fmt.Errorf("%w: message %v: %v", errDecode, msg, err) + } + for i, tx := range txs { + // Validate and mark the remote transaction + if tx == nil { + return fmt.Errorf("%w: transaction %d is nil", errDecode, i) + } + peer.markTransaction(tx.Hash()) + } + return backend.Handle(peer, &txs) +} + func handlePooledTransactions66(backend Backend, msg Decoder, peer *Peer) error { // Transactions arrived, make sure we have a valid and fresh chain to handle them if !backend.AcceptTxs() { diff --git a/eth/protocols/eth/handshake_test.go b/eth/protocols/eth/handshake_test.go index 05d473e05..3bebda2dc 100644 --- a/eth/protocols/eth/handshake_test.go +++ b/eth/protocols/eth/handshake_test.go @@ -27,6 +27,7 @@ import ( ) // Tests that handshake failures are detected and reported correctly. +func TestHandshake65(t *testing.T) { testHandshake(t, ETH65) } func TestHandshake66(t *testing.T) { testHandshake(t, ETH66) } func testHandshake(t *testing.T, protocol uint) { diff --git a/eth/protocols/eth/peer.go b/eth/protocols/eth/peer.go index 98273ccfc..e619c183b 100644 --- a/eth/protocols/eth/peer.go +++ b/eth/protocols/eth/peer.go @@ -108,8 +108,9 @@ func NewPeer(version uint, p *p2p.Peer, rw p2p.MsgReadWriter, txpool TxPool) *Pe // Start up all the broadcasters go peer.broadcastBlocks() go peer.broadcastTransactions() - go peer.announceTransactions() - + if version >= ETH65 { + go peer.announceTransactions() + } return peer } @@ -251,6 +252,22 @@ func (p *Peer) AsyncSendPooledTransactionHashes(hashes []common.Hash) { } } +// SendPooledTransactionsRLP sends requested transactions to the peer and adds the +// hashes in its transaction hash set for future reference. +// +// Note, the method assumes the hashes are correct and correspond to the list of +// transactions being sent. +func (p *Peer) SendPooledTransactionsRLP(hashes []common.Hash, txs []rlp.RawValue) error { + // Mark all the transactions as known, but ensure we don't overflow our limits + for p.knownTxs.Cardinality() > max(0, maxKnownTxs-len(hashes)) { + p.knownTxs.Pop() + } + for _, hash := range hashes { + p.knownTxs.Add(hash) + } + return p2p.Send(p.rw, PooledTransactionsMsg, txs) // Not packed into PooledTransactionsPacket to avoid RLP decoding +} + // ReplyPooledTransactionsRLP is the eth/66 version of SendPooledTransactionsRLP. func (p *Peer) ReplyPooledTransactionsRLP(id uint64, hashes []common.Hash, txs []rlp.RawValue) error { // Mark all the transactions as known, but ensure we don't overflow our limits @@ -329,6 +346,11 @@ func (p *Peer) AsyncSendNewBlock(block *types.Block, td *big.Int) { } } +// SendBlockHeaders sends a batch of block headers to the remote peer. +func (p *Peer) SendBlockHeaders(headers []*types.Header) error { + return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket(headers)) +} + // ReplyBlockHeaders is the eth/66 version of SendBlockHeaders. func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error { return p2p.Send(p.rw, BlockHeadersMsg, BlockHeadersPacket66{ @@ -337,6 +359,12 @@ func (p *Peer) ReplyBlockHeaders(id uint64, headers []*types.Header) error { }) } +// SendBlockBodiesRLP sends a batch of block contents to the remote peer from +// an already RLP encoded format. +func (p *Peer) SendBlockBodiesRLP(bodies []rlp.RawValue) error { + return p2p.Send(p.rw, BlockBodiesMsg, bodies) // Not packed into BlockBodiesPacket to avoid RLP decoding +} + // ReplyBlockBodiesRLP is the eth/66 version of SendBlockBodiesRLP. func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { // Not packed into BlockBodiesPacket to avoid RLP decoding @@ -346,6 +374,12 @@ func (p *Peer) ReplyBlockBodiesRLP(id uint64, bodies []rlp.RawValue) error { }) } +// SendNodeDataRLP sends a batch of arbitrary internal data, corresponding to the +// hashes requested. +func (p *Peer) SendNodeData(data [][]byte) error { + return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket(data)) +} + // ReplyNodeData is the eth/66 response to GetNodeData. func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { return p2p.Send(p.rw, NodeDataMsg, NodeDataPacket66{ @@ -354,6 +388,12 @@ func (p *Peer) ReplyNodeData(id uint64, data [][]byte) error { }) } +// SendReceiptsRLP sends a batch of transaction receipts, corresponding to the +// ones requested from an already RLP encoded format. +func (p *Peer) SendReceiptsRLP(receipts []rlp.RawValue) error { + return p2p.Send(p.rw, ReceiptsMsg, receipts) // Not packed into ReceiptsPacket to avoid RLP decoding +} + // ReplyReceiptsRLP is the eth/66 response to GetReceipts. func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { return p2p.Send(p.rw, ReceiptsMsg, ReceiptsRLPPacket66{ @@ -366,102 +406,138 @@ func (p *Peer) ReplyReceiptsRLP(id uint64, receipts []rlp.RawValue) error { // single header. It is used solely by the fetcher. func (p *Peer) RequestOneHeader(hash common.Hash) error { p.Log().Debug("Fetching single header", "hash", hash) - id := rand.Uint64() + query := GetBlockHeadersPacket{ + Origin: HashOrNumber{Hash: hash}, + Amount: uint64(1), + Skip: uint64(0), + Reverse: false, + } + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: id, - GetBlockHeadersPacket: &GetBlockHeadersPacket{ - Origin: HashOrNumber{Hash: hash}, - Amount: uint64(1), - Skip: uint64(0), - Reverse: false, - }, - }) + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: id, + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) } // RequestHeadersByHash fetches a batch of blocks' headers corresponding to the // specified header query, based on the hash of an origin block. func (p *Peer) RequestHeadersByHash(origin common.Hash, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromhash", origin, "skip", skip, "reverse", reverse) - id := rand.Uint64() + query := GetBlockHeadersPacket{ + Origin: HashOrNumber{Hash: origin}, + Amount: uint64(amount), + Skip: uint64(skip), + Reverse: reverse, + } + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: id, - GetBlockHeadersPacket: &GetBlockHeadersPacket{ - Origin: HashOrNumber{Hash: origin}, - Amount: uint64(amount), - Skip: uint64(skip), - Reverse: reverse, - }, - }) + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: id, + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) } // RequestHeadersByNumber fetches a batch of blocks' headers corresponding to the // specified header query, based on the number of an origin block. func (p *Peer) RequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { p.Log().Debug("Fetching batch of headers", "count", amount, "fromnum", origin, "skip", skip, "reverse", reverse) - id := rand.Uint64() + query := GetBlockHeadersPacket{ + Origin: HashOrNumber{Number: origin}, + Amount: uint64(amount), + Skip: uint64(skip), + Reverse: reverse, + } + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) - return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ - RequestId: id, - GetBlockHeadersPacket: &GetBlockHeadersPacket{ - Origin: HashOrNumber{Number: origin}, - Amount: uint64(amount), - Skip: uint64(skip), - Reverse: reverse, - }, - }) + requestTracker.Track(p.id, p.version, GetBlockHeadersMsg, BlockHeadersMsg, id) + return p2p.Send(p.rw, GetBlockHeadersMsg, &GetBlockHeadersPacket66{ + RequestId: id, + GetBlockHeadersPacket: &query, + }) + } + return p2p.Send(p.rw, GetBlockHeadersMsg, &query) +} + +// ExpectRequestHeadersByNumber is a testing method to mirror the recipient side +// of the RequestHeadersByNumber operation. +func (p *Peer) ExpectRequestHeadersByNumber(origin uint64, amount int, skip int, reverse bool) error { + req := &GetBlockHeadersPacket{ + Origin: HashOrNumber{Number: origin}, + Amount: uint64(amount), + Skip: uint64(skip), + Reverse: reverse, + } + return p2p.ExpectMsg(p.rw, GetBlockHeadersMsg, req) } // RequestBodies fetches a batch of blocks' bodies corresponding to the hashes // specified. func (p *Peer) RequestBodies(hashes []common.Hash) error { p.Log().Debug("Fetching batch of block bodies", "count", len(hashes)) - id := rand.Uint64() + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id) - return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ - RequestId: id, - GetBlockBodiesPacket: hashes, - }) + requestTracker.Track(p.id, p.version, GetBlockBodiesMsg, BlockBodiesMsg, id) + return p2p.Send(p.rw, GetBlockBodiesMsg, &GetBlockBodiesPacket66{ + RequestId: id, + GetBlockBodiesPacket: hashes, + }) + } + return p2p.Send(p.rw, GetBlockBodiesMsg, GetBlockBodiesPacket(hashes)) } // RequestNodeData fetches a batch of arbitrary data from a node's known state // data, corresponding to the specified hashes. func (p *Peer) RequestNodeData(hashes []common.Hash) error { p.Log().Debug("Fetching batch of state data", "count", len(hashes)) - id := rand.Uint64() + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id) - return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ - RequestId: id, - GetNodeDataPacket: hashes, - }) + requestTracker.Track(p.id, p.version, GetNodeDataMsg, NodeDataMsg, id) + return p2p.Send(p.rw, GetNodeDataMsg, &GetNodeDataPacket66{ + RequestId: id, + GetNodeDataPacket: hashes, + }) + } + return p2p.Send(p.rw, GetNodeDataMsg, GetNodeDataPacket(hashes)) } // RequestReceipts fetches a batch of transaction receipts from a remote node. func (p *Peer) RequestReceipts(hashes []common.Hash) error { p.Log().Debug("Fetching batch of receipts", "count", len(hashes)) - id := rand.Uint64() + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id) - return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ - RequestId: id, - GetReceiptsPacket: hashes, - }) + requestTracker.Track(p.id, p.version, GetReceiptsMsg, ReceiptsMsg, id) + return p2p.Send(p.rw, GetReceiptsMsg, &GetReceiptsPacket66{ + RequestId: id, + GetReceiptsPacket: hashes, + }) + } + return p2p.Send(p.rw, GetReceiptsMsg, GetReceiptsPacket(hashes)) } // RequestTxs fetches a batch of transactions from a remote node. func (p *Peer) RequestTxs(hashes []common.Hash) error { p.Log().Debug("Fetching batch of transactions", "count", len(hashes)) - id := rand.Uint64() + if p.Version() >= ETH66 { + id := rand.Uint64() - requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) - return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ - RequestId: id, - GetPooledTransactionsPacket: hashes, - }) + requestTracker.Track(p.id, p.version, GetPooledTransactionsMsg, PooledTransactionsMsg, id) + return p2p.Send(p.rw, GetPooledTransactionsMsg, &GetPooledTransactionsPacket66{ + RequestId: id, + GetPooledTransactionsPacket: hashes, + }) + } + return p2p.Send(p.rw, GetPooledTransactionsMsg, GetPooledTransactionsPacket(hashes)) } diff --git a/eth/protocols/eth/protocol.go b/eth/protocols/eth/protocol.go index 3c3da30fa..de1b0ed1e 100644 --- a/eth/protocols/eth/protocol.go +++ b/eth/protocols/eth/protocol.go @@ -30,6 +30,7 @@ import ( // Constants to match up protocol versions and messages const ( + ETH65 = 65 ETH66 = 66 ) @@ -39,28 +40,31 @@ const ProtocolName = "eth" // ProtocolVersions are the supported versions of the `eth` protocol (first // is primary). -var ProtocolVersions = []uint{ETH66} +var ProtocolVersions = []uint{ETH66, ETH65} // protocolLengths are the number of implemented message corresponding to // different protocol versions. -var protocolLengths = map[uint]uint64{ETH66: 17} +var protocolLengths = map[uint]uint64{ETH66: 17, ETH65: 17} // maxMessageSize is the maximum cap on the size of a protocol message. const maxMessageSize = 10 * 1024 * 1024 const ( - StatusMsg = 0x00 - NewBlockHashesMsg = 0x01 - TransactionsMsg = 0x02 - GetBlockHeadersMsg = 0x03 - BlockHeadersMsg = 0x04 - GetBlockBodiesMsg = 0x05 - BlockBodiesMsg = 0x06 - NewBlockMsg = 0x07 - GetNodeDataMsg = 0x0d - NodeDataMsg = 0x0e - GetReceiptsMsg = 0x0f - ReceiptsMsg = 0x10 + // Protocol messages in eth/64 + StatusMsg = 0x00 + NewBlockHashesMsg = 0x01 + TransactionsMsg = 0x02 + GetBlockHeadersMsg = 0x03 + BlockHeadersMsg = 0x04 + GetBlockBodiesMsg = 0x05 + BlockBodiesMsg = 0x06 + NewBlockMsg = 0x07 + GetNodeDataMsg = 0x0d + NodeDataMsg = 0x0e + GetReceiptsMsg = 0x0f + ReceiptsMsg = 0x10 + + // Protocol messages overloaded in eth/65 NewPooledTransactionHashesMsg = 0x08 GetPooledTransactionsMsg = 0x09 PooledTransactionsMsg = 0x0a @@ -124,7 +128,7 @@ type GetBlockHeadersPacket struct { Reverse bool // Query direction (false = rising towards latest, true = falling towards genesis) } -// GetBlockHeadersPacket66 represents a block header query over eth/66 +// GetBlockHeadersPacket represents a block header query over eth/66 type GetBlockHeadersPacket66 struct { RequestId uint64 *GetBlockHeadersPacket diff --git a/eth/sync.go b/eth/sync.go index 27941158f..ab114b59f 100644 --- a/eth/sync.go +++ b/eth/sync.go @@ -18,6 +18,7 @@ package eth import ( "math/big" + "math/rand" "sync/atomic" "time" @@ -27,13 +28,23 @@ import ( "github.com/ethereum/go-ethereum/eth/downloader" "github.com/ethereum/go-ethereum/eth/protocols/eth" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/p2p/enode" ) const ( forceSyncCycle = 10 * time.Second // Time interval to force syncs, even if few peers are available defaultMinSyncPeers = 5 // Amount of peers desired to start syncing + + // This is the target size for the packs of transactions sent by txsyncLoop64. + // A pack can get larger than this if a single transactions exceeds this size. + txsyncPackSize = 100 * 1024 ) +type txsync struct { + p *eth.Peer + txs []*types.Transaction +} + // syncTransactions starts sending all currently pending transactions to the given peer. func (h *handler) syncTransactions(p *eth.Peer) { // Assemble the set of transaction to broadcast or announce to the remote @@ -53,11 +64,94 @@ func (h *handler) syncTransactions(p *eth.Peer) { // The eth/65 protocol introduces proper transaction announcements, so instead // of dripping transactions across multiple peers, just send the entire list as // an announcement and let the remote side decide what they need (likely nothing). - hashes := make([]common.Hash, len(txs)) - for i, tx := range txs { - hashes[i] = tx.Hash() + if p.Version() >= eth.ETH65 { + hashes := make([]common.Hash, len(txs)) + for i, tx := range txs { + hashes[i] = tx.Hash() + } + p.AsyncSendPooledTransactionHashes(hashes) + return + } + // Out of luck, peer is running legacy protocols, drop the txs over + select { + case h.txsyncCh <- &txsync{p: p, txs: txs}: + case <-h.quitSync: + } +} + +// txsyncLoop64 takes care of the initial transaction sync for each new +// connection. When a new peer appears, we relay all currently pending +// transactions. In order to minimise egress bandwidth usage, we send +// the transactions in small packs to one peer at a time. +func (h *handler) txsyncLoop64() { + defer h.wg.Done() + + var ( + pending = make(map[enode.ID]*txsync) + sending = false // whether a send is active + pack = new(txsync) // the pack that is being sent + done = make(chan error, 1) // result of the send + ) + + // send starts a sending a pack of transactions from the sync. + send := func(s *txsync) { + if s.p.Version() >= eth.ETH65 { + panic("initial transaction syncer running on eth/65+") + } + // Fill pack with transactions up to the target size. + size := common.StorageSize(0) + pack.p = s.p + pack.txs = pack.txs[:0] + for i := 0; i < len(s.txs) && size < txsyncPackSize; i++ { + pack.txs = append(pack.txs, s.txs[i]) + size += s.txs[i].Size() + } + // Remove the transactions that will be sent. + s.txs = s.txs[:copy(s.txs, s.txs[len(pack.txs):])] + if len(s.txs) == 0 { + delete(pending, s.p.Peer.ID()) + } + // Send the pack in the background. + s.p.Log().Trace("Sending batch of transactions", "count", len(pack.txs), "bytes", size) + sending = true + go func() { done <- pack.p.SendTransactions(pack.txs) }() + } + // pick chooses the next pending sync. + pick := func() *txsync { + if len(pending) == 0 { + return nil + } + n := rand.Intn(len(pending)) + 1 + for _, s := range pending { + if n--; n == 0 { + return s + } + } + return nil + } + + for { + select { + case s := <-h.txsyncCh: + pending[s.p.Peer.ID()] = s + if !sending { + send(s) + } + case err := <-done: + sending = false + // Stop tracking peers that cause send failures. + if err != nil { + pack.p.Log().Debug("Transaction send failed", "err", err) + delete(pending, pack.p.Peer.ID()) + } + // Schedule the next send. + if s := pick(); s != nil { + send(s) + } + case <-h.quitSync: + return + } } - p.AsyncSendPooledTransactionHashes(hashes) } // chainSyncer coordinates blockchain sync components. diff --git a/eth/sync_test.go b/eth/sync_test.go index e96b9ee81..a0c6f8602 100644 --- a/eth/sync_test.go +++ b/eth/sync_test.go @@ -28,6 +28,7 @@ import ( ) // Tests that fast sync is disabled after a successful sync cycle. +func TestFastSyncDisabling65(t *testing.T) { testFastSyncDisabling(t, eth.ETH65) } func TestFastSyncDisabling66(t *testing.T) { testFastSyncDisabling(t, eth.ETH66) } // Tests that fast sync gets disabled as soon as a real block is successfully diff --git a/les/client_handler.go b/les/client_handler.go index b903b1106..e95996c51 100644 --- a/les/client_handler.go +++ b/les/client_handler.go @@ -472,7 +472,7 @@ func (d *downloaderPeerNotify) registerPeer(p *serverPeer) { handler: h, peer: p, } - h.downloader.RegisterLightPeer(p.id, eth.ETH66, pc) + h.downloader.RegisterLightPeer(p.id, eth.ETH65, pc) } func (d *downloaderPeerNotify) unregisterPeer(p *serverPeer) {