diff --git a/eth/handler.go b/eth/handler.go index 551781ef0..1f62d820e 100644 --- a/eth/handler.go +++ b/eth/handler.go @@ -49,6 +49,9 @@ const ( // txChanSize is the size of channel listening to NewTxsEvent. // The number is referenced from the size of tx pool. txChanSize = 4096 + + // minimim number of peers to broadcast new blocks to + minBroadcastPeers = 4 ) var ( @@ -705,7 +708,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) { return } // Send the block to a subset of our peers - transfer := peers[:int(math.Sqrt(float64(len(peers))))] + transferLen := int(math.Sqrt(float64(len(peers)))) + if transferLen < minBroadcastPeers { + transferLen = minBroadcastPeers + } + if transferLen > len(peers) { + transferLen = len(peers) + } + transfer := peers[:transferLen] for _, peer := range transfer { peer.AsyncSendNewBlock(block, td) } diff --git a/eth/handler_test.go b/eth/handler_test.go index 0885a0448..7811cd480 100644 --- a/eth/handler_test.go +++ b/eth/handler_test.go @@ -17,6 +17,7 @@ package eth import ( + "fmt" "math" "math/big" "math/rand" @@ -466,14 +467,17 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool } // Create a DAO aware protocol manager var ( - evmux = new(event.TypeMux) - pow = ethash.NewFaker() - db = ethdb.NewMemDatabase() - config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} - gspec = &core.Genesis{Config: config} - genesis = gspec.MustCommit(db) - blockchain, _ = core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil) + evmux = new(event.TypeMux) + pow = ethash.NewFaker() + db = ethdb.NewMemDatabase() + config = ¶ms.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} + gspec = &core.Genesis{Config: config} + genesis = gspec.MustCommit(db) ) + blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil) + if err != nil { + t.Fatalf("failed to create new blockchain: %v", err) + } pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db) if err != nil { t.Fatalf("failed to start test protocol manager: %v", err) @@ -520,3 +524,90 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool } } } + +func TestBroadcastBlock(t *testing.T) { + var tests = []struct { + totalPeers int + broadcastExpected int + }{ + {1, 1}, + {2, 2}, + {3, 3}, + {4, 4}, + {5, 4}, + {9, 4}, + {12, 4}, + {16, 4}, + {26, 5}, + {100, 10}, + } + for _, test := range tests { + testBroadcastBlock(t, test.totalPeers, test.broadcastExpected) + } +} + +func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) { + var ( + evmux = new(event.TypeMux) + pow = ethash.NewFaker() + db = ethdb.NewMemDatabase() + config = ¶ms.ChainConfig{} + gspec = &core.Genesis{Config: config} + genesis = gspec.MustCommit(db) + ) + blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil) + if err != nil { + t.Fatalf("failed to create new blockchain: %v", err) + } + pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db) + if err != nil { + t.Fatalf("failed to start test protocol manager: %v", err) + } + pm.Start(1000) + defer pm.Stop() + var peers []*testPeer + for i := 0; i < totalPeers; i++ { + peer, _ := newTestPeer(fmt.Sprintf("peer %d", i), eth63, pm, true) + defer peer.close() + peers = append(peers, peer) + } + chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 1, func(i int, gen *core.BlockGen) {}) + pm.BroadcastBlock(chain[0], true /*propagate*/) + + errCh := make(chan error, totalPeers) + doneCh := make(chan struct{}, totalPeers) + for _, peer := range peers { + go func(p *testPeer) { + if err := p2p.ExpectMsg(p.app, NewBlockMsg, &newBlockData{Block: chain[0], TD: big.NewInt(131136)}); err != nil { + errCh <- err + } else { + doneCh <- struct{}{} + } + }(peer) + } + timeoutCh := time.NewTimer(time.Millisecond * 100).C + var receivedCount int +outer: + for { + select { + case err = <-errCh: + break outer + case <-doneCh: + receivedCount++ + if receivedCount == totalPeers { + break outer + } + case <-timeoutCh: + break outer + } + } + for _, peer := range peers { + peer.app.Close() + } + if err != nil { + t.Errorf("error matching block by peer: %v", err) + } + if receivedCount != broadcastExpected { + t.Errorf("block broadcast to %d peers, expected %d", receivedCount, broadcastExpected) + } +}