Merge pull request #840 from karalabe/throttled-dialing

p2p: throttled handshakes
This commit is contained in:
Jeffrey Wilcke 2015-05-08 06:53:40 -07:00
commit 23454dcfcb
6 changed files with 180 additions and 19 deletions

View File

@ -242,6 +242,7 @@ JavaScript API. See https://github.com/ethereum/go-ethereum/wiki/Javascipt-Conso
utils.JSpathFlag, utils.JSpathFlag,
utils.ListenPortFlag, utils.ListenPortFlag,
utils.MaxPeersFlag, utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.EtherbaseFlag, utils.EtherbaseFlag,
utils.MinerThreadsFlag, utils.MinerThreadsFlag,
utils.MiningEnabledFlag, utils.MiningEnabledFlag,

View File

@ -75,6 +75,7 @@ func init() {
utils.LogFileFlag, utils.LogFileFlag,
utils.LogLevelFlag, utils.LogLevelFlag,
utils.MaxPeersFlag, utils.MaxPeersFlag,
utils.MaxPendingPeersFlag,
utils.MinerThreadsFlag, utils.MinerThreadsFlag,
utils.NATFlag, utils.NATFlag,
utils.NodeKeyFileFlag, utils.NodeKeyFileFlag,

View File

@ -197,6 +197,11 @@ var (
Usage: "Maximum number of network peers (network disabled if set to 0)", Usage: "Maximum number of network peers (network disabled if set to 0)",
Value: 16, Value: 16,
} }
MaxPendingPeersFlag = cli.IntFlag{
Name: "maxpendpeers",
Usage: "Maximum number of pending connection attempts (defaults used if set to 0)",
Value: 0,
}
ListenPortFlag = cli.IntFlag{ ListenPortFlag = cli.IntFlag{
Name: "port", Name: "port",
Usage: "Network listening port", Usage: "Network listening port",
@ -292,6 +297,7 @@ func MakeEthConfig(clientID, version string, ctx *cli.Context) *eth.Config {
AccountManager: GetAccountManager(ctx), AccountManager: GetAccountManager(ctx),
VmDebug: ctx.GlobalBool(VMDebugFlag.Name), VmDebug: ctx.GlobalBool(VMDebugFlag.Name),
MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name), MaxPeers: ctx.GlobalInt(MaxPeersFlag.Name),
MaxPendingPeers: ctx.GlobalInt(MaxPendingPeersFlag.Name),
Port: ctx.GlobalString(ListenPortFlag.Name), Port: ctx.GlobalString(ListenPortFlag.Name),
NAT: GetNAT(ctx), NAT: GetNAT(ctx),
NatSpec: ctx.GlobalBool(NatspecEnabledFlag.Name), NatSpec: ctx.GlobalBool(NatspecEnabledFlag.Name),

View File

@ -61,6 +61,7 @@ type Config struct {
NatSpec bool NatSpec bool
MaxPeers int MaxPeers int
MaxPendingPeers int
Port string Port string
// Space-separated list of discovery node URLs // Space-separated list of discovery node URLs
@ -283,6 +284,7 @@ func New(config *Config) (*Ethereum, error) {
PrivateKey: netprv, PrivateKey: netprv,
Name: config.Name, Name: config.Name,
MaxPeers: config.MaxPeers, MaxPeers: config.MaxPeers,
MaxPendingPeers: config.MaxPendingPeers,
Protocols: protocols, Protocols: protocols,
NAT: config.NAT, NAT: config.NAT,
NoDial: !config.Dial, NoDial: !config.Dial,

View File

@ -22,10 +22,11 @@ const (
refreshPeersInterval = 30 * time.Second refreshPeersInterval = 30 * time.Second
staticPeerCheckInterval = 15 * time.Second staticPeerCheckInterval = 15 * time.Second
// This is the maximum number of inbound connection // Maximum number of concurrently handshaking inbound connections.
// that are allowed to linger between 'accepted' and maxAcceptConns = 10
// 'added as peer'.
maxAcceptConns = 50 // Maximum number of concurrently dialing outbound connections.
maxDialingConns = 10
// total timeout for encryption handshake and protocol // total timeout for encryption handshake and protocol
// handshake in both directions. // handshake in both directions.
@ -52,6 +53,11 @@ type Server struct {
// connected. It must be greater than zero. // connected. It must be greater than zero.
MaxPeers int MaxPeers int
// MaxPendingPeers is the maximum number of peers that can be pending in the
// handshake phase, counted separately for inbound and outbound connections.
// Zero defaults to preset values.
MaxPendingPeers int
// Name sets the node name of this server. // Name sets the node name of this server.
// Use common.MakeName to create a name that follows existing conventions. // Use common.MakeName to create a name that follows existing conventions.
Name string Name string
@ -331,8 +337,12 @@ func (srv *Server) listenLoop() {
// This channel acts as a semaphore limiting // This channel acts as a semaphore limiting
// active inbound connections that are lingering pre-handshake. // active inbound connections that are lingering pre-handshake.
// If all slots are taken, no further connections are accepted. // If all slots are taken, no further connections are accepted.
slots := make(chan struct{}, maxAcceptConns) tokens := maxAcceptConns
for i := 0; i < maxAcceptConns; i++ { if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{} slots <- struct{}{}
} }
@ -401,7 +411,15 @@ func (srv *Server) dialLoop() {
defer srv.loopWG.Done() defer srv.loopWG.Done()
defer refresh.Stop() defer refresh.Stop()
// TODO: maybe limit number of active dials // Limit the number of concurrent dials
tokens := maxAcceptConns
if srv.MaxPendingPeers > 0 {
tokens = srv.MaxPendingPeers
}
slots := make(chan struct{}, tokens)
for i := 0; i < tokens; i++ {
slots <- struct{}{}
}
dial := func(dest *discover.Node) { dial := func(dest *discover.Node) {
// Don't dial nodes that would fail the checks in addPeer. // Don't dial nodes that would fail the checks in addPeer.
// This is important because the connection handshake is a lot // This is important because the connection handshake is a lot
@ -413,11 +431,14 @@ func (srv *Server) dialLoop() {
if !ok || dialing[dest.ID] { if !ok || dialing[dest.ID] {
return return
} }
// Request a dial slot to prevent CPU exhaustion
<-slots
dialing[dest.ID] = true dialing[dest.ID] = true
srv.peerWG.Add(1) srv.peerWG.Add(1)
go func() { go func() {
srv.dialNode(dest) srv.dialNode(dest)
slots <- struct{}{}
dialed <- dest dialed <- dest
}() }()
} }

View File

@ -369,6 +369,136 @@ func TestServerTrustedPeers(t *testing.T) {
} }
} }
// Tests that a failed dial will temporarily throttle a peer.
func TestServerMaxPendingDials(t *testing.T) {
defer testlog(t).detach()
// Start a simple test server
server := &Server{
ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 1,
}
if err := server.Start(); err != nil {
t.Fatal("failed to start test server: %v", err)
}
defer server.Stop()
// Simulate two separate remote peers
peers := make(chan *discover.Node, 2)
conns := make(chan net.Conn, 2)
for i := 0; i < 2; i++ {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
t.Fatalf("listener %d: failed to setup: %v", i, err)
}
defer listener.Close()
addr := listener.Addr().(*net.TCPAddr)
peers <- &discover.Node{
ID: discover.PubkeyID(&newkey().PublicKey),
IP: addr.IP,
TCP: uint16(addr.Port),
}
go func() {
conn, err := listener.Accept()
if err == nil {
conns <- conn
}
}()
}
// Request a dial for both peers
go func() {
for i := 0; i < 2; i++ {
server.staticDial <- <-peers // hack piggybacking the static implementation
}
}()
// Make sure only one outbound connection goes through
var conn net.Conn
select {
case conn = <-conns:
case <-time.After(100 * time.Millisecond):
t.Fatalf("first dial timeout")
}
select {
case conn = <-conns:
t.Fatalf("second dial completed prematurely")
case <-time.After(100 * time.Millisecond):
}
// Finish the first dial, check the second
conn.Close()
select {
case conn = <-conns:
conn.Close()
case <-time.After(100 * time.Millisecond):
t.Fatalf("second dial timeout")
}
}
func TestServerMaxPendingAccepts(t *testing.T) {
defer testlog(t).detach()
// Start a test server and a peer sink for synchronization
started := make(chan *Peer)
server := &Server{
ListenAddr: "127.0.0.1:0",
PrivateKey: newkey(),
MaxPeers: 10,
MaxPendingPeers: 1,
NoDial: true,
newPeerHook: func(p *Peer) { started <- p },
}
if err := server.Start(); err != nil {
t.Fatal("failed to start test server: %v", err)
}
defer server.Stop()
// Try and connect to the server on multiple threads concurrently
conns := make([]net.Conn, 2)
for i := 0; i < 2; i++ {
dialer := &net.Dialer{Deadline: time.Now().Add(3 * time.Second)}
conn, err := dialer.Dial("tcp", server.ListenAddr)
if err != nil {
t.Fatalf("failed to dial server: %v", err)
}
conns[i] = conn
}
// Check that a handshake on the second doesn't pass
go func() {
key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err := setupConn(conns[1], key, shake, server.Self(), false, server.trustedNodes); err != nil {
t.Fatalf("failed to run handshake: %v", err)
}
}()
select {
case <-started:
t.Fatalf("handshake on second connection accepted")
case <-time.After(time.Second):
}
// Shake on first, check that both go through
go func() {
key := newkey()
shake := &protoHandshake{Version: baseProtocolVersion, ID: discover.PubkeyID(&key.PublicKey)}
if _, err := setupConn(conns[0], key, shake, server.Self(), false, server.trustedNodes); err != nil {
t.Fatalf("failed to run handshake: %v", err)
}
}()
for i := 0; i < 2; i++ {
select {
case <-started:
case <-time.After(time.Second):
t.Fatalf("peer %d: handshake timeout", i)
}
}
}
func newkey() *ecdsa.PrivateKey { func newkey() *ecdsa.PrivateKey {
key, err := crypto.GenerateKey() key, err := crypto.GenerateKey()
if err != nil { if err != nil {