eth: broadcast blocks to at least 4 peers (#17725)

This commit is contained in:
ledgerwatch 2018-09-29 21:17:06 +01:00 committed by Felix Lange
parent 01d9f29805
commit 3d782bc727
2 changed files with 109 additions and 8 deletions

View File

@ -49,6 +49,9 @@ const (
// txChanSize is the size of channel listening to NewTxsEvent. // txChanSize is the size of channel listening to NewTxsEvent.
// The number is referenced from the size of tx pool. // The number is referenced from the size of tx pool.
txChanSize = 4096 txChanSize = 4096
// minimim number of peers to broadcast new blocks to
minBroadcastPeers = 4
) )
var ( var (
@ -705,7 +708,14 @@ func (pm *ProtocolManager) BroadcastBlock(block *types.Block, propagate bool) {
return return
} }
// Send the block to a subset of our peers // Send the block to a subset of our peers
transfer := peers[:int(math.Sqrt(float64(len(peers))))] transferLen := int(math.Sqrt(float64(len(peers))))
if transferLen < minBroadcastPeers {
transferLen = minBroadcastPeers
}
if transferLen > len(peers) {
transferLen = len(peers)
}
transfer := peers[:transferLen]
for _, peer := range transfer { for _, peer := range transfer {
peer.AsyncSendNewBlock(block, td) peer.AsyncSendNewBlock(block, td)
} }

View File

@ -17,6 +17,7 @@
package eth package eth
import ( import (
"fmt"
"math" "math"
"math/big" "math/big"
"math/rand" "math/rand"
@ -466,14 +467,17 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
} }
// Create a DAO aware protocol manager // Create a DAO aware protocol manager
var ( var (
evmux = new(event.TypeMux) evmux = new(event.TypeMux)
pow = ethash.NewFaker() pow = ethash.NewFaker()
db = ethdb.NewMemDatabase() db = ethdb.NewMemDatabase()
config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked} config = &params.ChainConfig{DAOForkBlock: big.NewInt(1), DAOForkSupport: localForked}
gspec = &core.Genesis{Config: config} gspec = &core.Genesis{Config: config}
genesis = gspec.MustCommit(db) genesis = gspec.MustCommit(db)
blockchain, _ = core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
) )
blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db) pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
if err != nil { if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err) t.Fatalf("failed to start test protocol manager: %v", err)
@ -520,3 +524,90 @@ func testDAOChallenge(t *testing.T, localForked, remoteForked bool, timeout bool
} }
} }
} }
func TestBroadcastBlock(t *testing.T) {
var tests = []struct {
totalPeers int
broadcastExpected int
}{
{1, 1},
{2, 2},
{3, 3},
{4, 4},
{5, 4},
{9, 4},
{12, 4},
{16, 4},
{26, 5},
{100, 10},
}
for _, test := range tests {
testBroadcastBlock(t, test.totalPeers, test.broadcastExpected)
}
}
func testBroadcastBlock(t *testing.T, totalPeers, broadcastExpected int) {
var (
evmux = new(event.TypeMux)
pow = ethash.NewFaker()
db = ethdb.NewMemDatabase()
config = &params.ChainConfig{}
gspec = &core.Genesis{Config: config}
genesis = gspec.MustCommit(db)
)
blockchain, err := core.NewBlockChain(db, nil, config, pow, vm.Config{}, nil)
if err != nil {
t.Fatalf("failed to create new blockchain: %v", err)
}
pm, err := NewProtocolManager(config, downloader.FullSync, DefaultConfig.NetworkId, evmux, new(testTxPool), pow, blockchain, db)
if err != nil {
t.Fatalf("failed to start test protocol manager: %v", err)
}
pm.Start(1000)
defer pm.Stop()
var peers []*testPeer
for i := 0; i < totalPeers; i++ {
peer, _ := newTestPeer(fmt.Sprintf("peer %d", i), eth63, pm, true)
defer peer.close()
peers = append(peers, peer)
}
chain, _ := core.GenerateChain(gspec.Config, genesis, ethash.NewFaker(), db, 1, func(i int, gen *core.BlockGen) {})
pm.BroadcastBlock(chain[0], true /*propagate*/)
errCh := make(chan error, totalPeers)
doneCh := make(chan struct{}, totalPeers)
for _, peer := range peers {
go func(p *testPeer) {
if err := p2p.ExpectMsg(p.app, NewBlockMsg, &newBlockData{Block: chain[0], TD: big.NewInt(131136)}); err != nil {
errCh <- err
} else {
doneCh <- struct{}{}
}
}(peer)
}
timeoutCh := time.NewTimer(time.Millisecond * 100).C
var receivedCount int
outer:
for {
select {
case err = <-errCh:
break outer
case <-doneCh:
receivedCount++
if receivedCount == totalPeers {
break outer
}
case <-timeoutCh:
break outer
}
}
for _, peer := range peers {
peer.app.Close()
}
if err != nil {
t.Errorf("error matching block by peer: %v", err)
}
if receivedCount != broadcastExpected {
t.Errorf("block broadcast to %d peers, expected %d", receivedCount, broadcastExpected)
}
}