diff --git a/p2p/handshake.go b/p2p/handshake.go index 3ad25bae4..7fc497517 100644 --- a/p2p/handshake.go +++ b/p2p/handshake.go @@ -37,7 +37,7 @@ const ( // // The MsgReadWriter is usually layered as follows: // -// lockedRW (thread-safety for ReadMsg, WriteMsg) +// netWrapper (I/O timeouts, thread-safe ReadMsg, WriteMsg) // rlpxFrameRW (message encoding, encryption, authentication) // bufio.ReadWriter (buffering) // net.Conn (network I/O) @@ -83,7 +83,6 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) ( } // Run the protocol handshake using authenticated messages. - // TODO: move buffering setup here (out of newFrameRW) rw := newRlpxFrameRW(fd, secrets) rhs, err := readProtocolHandshake(rw, our) if err != nil { @@ -96,7 +95,7 @@ func setupInboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake) ( if err := writeProtocolHandshake(rw, our); err != nil { return nil, fmt.Errorf("protocol write error: %v", err) } - return &conn{&lockedRW{wrapped: rw}, rhs}, nil + return &conn{rw, rhs}, nil } func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, dial *discover.Node) (*conn, error) { @@ -106,7 +105,6 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, } // Run the protocol handshake using authenticated messages. - // TODO: move buffering setup here (out of newFrameRW) rw := newRlpxFrameRW(fd, secrets) if err := writeProtocolHandshake(rw, our); err != nil { return nil, fmt.Errorf("protocol write error: %v", err) @@ -118,7 +116,7 @@ func setupOutboundConn(fd net.Conn, prv *ecdsa.PrivateKey, our *protoHandshake, if rhs.ID != dial.ID { return nil, errors.New("dialed node id mismatch") } - return &conn{&lockedRW{wrapped: rw}, rhs}, nil + return &conn{rw, rhs}, nil } // encHandshake contains the state of the encryption handshake. diff --git a/p2p/message.go b/p2p/message.go index 04b9e71f3..f88c31d1d 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -6,6 +6,7 @@ import ( "fmt" "io" "io/ioutil" + "net" "sync" "sync/atomic" "time" @@ -14,28 +15,6 @@ import ( "github.com/ethereum/go-ethereum/rlp" ) -// parameters for frameRW -const ( - // maximum time allowed for reading a message header. - // this is effectively the amount of time a connection can be idle. - frameReadTimeout = 1 * time.Minute - - // maximum time allowed for reading the payload data of a message. - // this is shorter than (and distinct from) frameReadTimeout because - // the connection is not considered idle while a message is transferred. - // this also limits the payload size of messages to how much the connection - // can transfer within the timeout. - payloadReadTimeout = 5 * time.Second - - // maximum amount of time allowed for writing a complete message. - msgWriteTimeout = 5 * time.Second - - // messages smaller than this many bytes will be read at - // once before passing them to a protocol. this increases - // concurrency in the processing. - wholePayloadSize = 64 * 1024 -) - // Msg defines the structure of a p2p message. // // Note that a Msg can only be sent once since the Payload reader is @@ -103,22 +82,27 @@ func EncodeMsg(w MsgWriter, code uint64, data ...interface{}) error { return w.WriteMsg(NewMsg(code, data...)) } -// lockedRW wraps a MsgReadWriter with locks around -// ReadMsg and WriteMsg. -type lockedRW struct { +// netWrapper wrapsa MsgReadWriter with locks around +// ReadMsg/WriteMsg and applies read/write deadlines. +type netWrapper struct { rmu, wmu sync.Mutex - wrapped MsgReadWriter + + rtimeout, wtimeout time.Duration + conn net.Conn + wrapped MsgReadWriter } -func (rw *lockedRW) ReadMsg() (Msg, error) { +func (rw *netWrapper) ReadMsg() (Msg, error) { rw.rmu.Lock() defer rw.rmu.Unlock() + rw.conn.SetReadDeadline(time.Now().Add(rw.rtimeout)) return rw.wrapped.ReadMsg() } -func (rw *lockedRW) WriteMsg(msg Msg) error { +func (rw *netWrapper) WriteMsg(msg Msg) error { rw.wmu.Lock() defer rw.wmu.Unlock() + rw.conn.SetWriteDeadline(time.Now().Add(rw.wtimeout)) return rw.wrapped.WriteMsg(msg) } diff --git a/p2p/peer.go b/p2p/peer.go index 025be4ba9..c2c83abfc 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -20,8 +20,8 @@ const ( baseProtocolLength = uint64(16) baseProtocolMaxMsgSize = 10 * 1024 * 1024 - disconnectGracePeriod = 2 * time.Second pingInterval = 15 * time.Second + disconnectGracePeriod = 2 * time.Second ) const ( @@ -176,6 +176,7 @@ func (p *Peer) politeDisconnect(reason DiscReason) { func (p *Peer) readLoop() error { for { + p.conn.SetDeadline(time.Now().Add(frameReadTimeout)) msg, err := p.rw.ReadMsg() if err != nil { return err diff --git a/p2p/rlpx.go b/p2p/rlpx.go index a041bb314..166bbb5e6 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -21,6 +21,11 @@ var ( zero16 = make([]byte, 16) ) +// rlpxFrameRW implements a simplified version of RLPx framing. +// chunked messages are not supported and all headers are equal to +// zeroHeader. +// +// rlpxFrameRW is not safe for concurrent use from multiple goroutines. type rlpxFrameRW struct { conn io.ReadWriter enc cipher.Stream diff --git a/p2p/server.go b/p2p/server.go index 67d5514b4..8f99bc33d 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -17,9 +17,17 @@ import ( ) const ( - handshakeTimeout = 5 * time.Second defaultDialTimeout = 10 * time.Second refreshPeersInterval = 30 * time.Second + + // total timeout for encryption handshake and protocol + // handshake in both directions. + handshakeTimeout = 5 * time.Second + // maximum time allowed for reading a complete message. + // this is effectively the amount of time a connection can be idle. + frameReadTimeout = 1 * time.Minute + // maximum amount of time allowed for writing a complete message. + frameWriteTimeout = 5 * time.Second ) var srvlog = logger.NewLogger("P2P Server") @@ -359,14 +367,18 @@ func (srv *Server) findPeers() { func (srv *Server) startPeer(fd net.Conn, dest *discover.Node) { // TODO: handle/store session token - // TODO: reenable deadlines - // fd.SetDeadline(time.Now().Add(handshakeTimeout)) + fd.SetDeadline(time.Now().Add(handshakeTimeout)) conn, err := srv.setupFunc(fd, srv.PrivateKey, srv.ourHandshake, dest) if err != nil { fd.Close() srvlog.Debugf("Handshake with %v failed: %v", fd.RemoteAddr(), err) return } + + conn.MsgReadWriter = &netWrapper{ + wrapped: conn.MsgReadWriter, + conn: fd, rtimeout: frameReadTimeout, wtimeout: frameWriteTimeout, + } p := newPeer(fd, conn, srv.Protocols) if ok, reason := srv.addPeer(conn.ID, p); !ok { srvlog.DebugDetailf("Not adding %v (%v)\n", p, reason)