diff --git a/p2p/dial.go b/p2p/dial.go index d228514fc..075a0f936 100644 --- a/p2p/dial.go +++ b/p2p/dial.go @@ -350,7 +350,7 @@ func (t *dialTask) dial(srv *Server, dest *enode.Node) error { if err != nil { return &dialError{err} } - mfd := newMeteredConn(fd, false) + mfd := newMeteredConn(fd, false, dest.IP()) return srv.SetupConn(mfd, t.flags, dest) } diff --git a/p2p/metrics.go b/p2p/metrics.go index 2d52fd1fd..6a7c0bad3 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -19,53 +19,215 @@ package p2p import ( + "fmt" "net" + "sync" + "sync/atomic" + "time" + "github.com/ethereum/go-ethereum/p2p/enode" + + "github.com/ethereum/go-ethereum/event" + "github.com/ethereum/go-ethereum/log" "github.com/ethereum/go-ethereum/metrics" ) -var ( - ingressConnectMeter = metrics.NewRegisteredMeter("p2p/InboundConnects", nil) - ingressTrafficMeter = metrics.NewRegisteredMeter("p2p/InboundTraffic", nil) - egressConnectMeter = metrics.NewRegisteredMeter("p2p/OutboundConnects", nil) - egressTrafficMeter = metrics.NewRegisteredMeter("p2p/OutboundTraffic", nil) +const ( + MetricsInboundConnects = "p2p/InboundConnects" // Name for the registered inbound connects meter + MetricsInboundTraffic = "p2p/InboundTraffic" // Name for the registered inbound traffic meter + MetricsOutboundConnects = "p2p/OutboundConnects" // Name for the registered outbound connects meter + MetricsOutboundTraffic = "p2p/OutboundTraffic" // Name for the registered outbound traffic meter + + MeteredPeerLimit = 1024 // This amount of peers are individually metered ) +var ( + ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections + ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic + egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections + egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic + + PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress + PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.DefaultRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress + + meteredPeerFeed event.Feed // Event feed for peer metrics + meteredPeerCount int32 // Actually stored peer connection count +) + +// MeteredPeerEventType is the type of peer events emitted by a metered connection. +type MeteredPeerEventType int + +const ( + // PeerConnected is the type of event emitted when a peer successfully + // made the handshake. + PeerConnected MeteredPeerEventType = iota + + // PeerDisconnected is the type of event emitted when a peer disconnects. + PeerDisconnected + + // PeerHandshakeFailed is the type of event emitted when a peer fails to + // make the handshake or disconnects before the handshake. + PeerHandshakeFailed +) + +// MeteredPeerEvent is an event emitted when peers connect or disconnect. +type MeteredPeerEvent struct { + Type MeteredPeerEventType // Type of peer event + IP net.IP // IP address of the peer + ID string // NodeID of the peer + Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection + Ingress uint64 // Ingress count at the moment of the event + Egress uint64 // Egress count at the moment of the event +} + +// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events +// if metrics collection is enabled. +func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription { + return meteredPeerFeed.Subscribe(ch) +} + // meteredConn is a wrapper around a net.Conn that meters both the // inbound and outbound network traffic. type meteredConn struct { net.Conn // Network connection to wrap with metering + + connected time.Time // Connection time of the peer + ip net.IP // IP address of the peer + id string // NodeID of the peer + + // trafficMetered denotes if the peer is registered in the traffic registries. + // Its value is true if the metered peer count doesn't reach the limit in the + // moment of the peer's connection. + trafficMetered bool + ingressMeter metrics.Meter // Meter for the read bytes of the peer + egressMeter metrics.Meter // Meter for the written bytes of the peer + + lock sync.RWMutex // Lock protecting the metered connection's internals } -// newMeteredConn creates a new metered connection, also bumping the ingress or -// egress connection meter. If the metrics system is disabled, this function -// returns the original object. -func newMeteredConn(conn net.Conn, ingress bool) net.Conn { +// newMeteredConn creates a new metered connection, bumps the ingress or egress +// connection meter and also increases the metered peer count. If the metrics +// system is disabled or the IP address is unspecified, this function returns +// the original object. +func newMeteredConn(conn net.Conn, ingress bool, ip net.IP) net.Conn { // Short circuit if metrics are disabled if !metrics.Enabled { return conn } - // Otherwise bump the connection counters and wrap the connection + if ip.IsUnspecified() { + log.Warn("Peer IP is unspecified") + return conn + } + // Bump the connection counters and wrap the connection if ingress { ingressConnectMeter.Mark(1) } else { egressConnectMeter.Mark(1) } - return &meteredConn{Conn: conn} + return &meteredConn{ + Conn: conn, + ip: ip, + connected: time.Now(), + } } -// Read delegates a network read to the underlying connection, bumping the ingress -// traffic meter along the way. +// Read delegates a network read to the underlying connection, bumping the common +// and the peer ingress traffic meters along the way. func (c *meteredConn) Read(b []byte) (n int, err error) { n, err = c.Conn.Read(b) ingressTrafficMeter.Mark(int64(n)) - return + c.lock.RLock() + if c.trafficMetered { + c.ingressMeter.Mark(int64(n)) + } + c.lock.RUnlock() + return n, err } -// Write delegates a network write to the underlying connection, bumping the -// egress traffic meter along the way. +// Write delegates a network write to the underlying connection, bumping the common +// and the peer egress traffic meters along the way. func (c *meteredConn) Write(b []byte) (n int, err error) { n, err = c.Conn.Write(b) egressTrafficMeter.Mark(int64(n)) - return + c.lock.RLock() + if c.trafficMetered { + c.egressMeter.Mark(int64(n)) + } + c.lock.RUnlock() + return n, err +} + +// handshakeDone is called when a peer handshake is done. Registers the peer to +// the ingress and the egress traffic registries using the peer's IP and node ID, +// also emits connect event. +func (c *meteredConn) handshakeDone(nodeID enode.ID) { + id := nodeID.String() + if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit { + // Don't register the peer in the traffic registries. + atomic.AddInt32(&meteredPeerCount, -1) + c.lock.Lock() + c.id, c.trafficMetered = id, false + c.lock.Unlock() + log.Warn("Metered peer count reached the limit") + } else { + key := fmt.Sprintf("%s/%s", c.ip, id) + c.lock.Lock() + c.id, c.trafficMetered = id, true + c.ingressMeter = metrics.NewRegisteredMeter(key, PeerIngressRegistry) + c.egressMeter = metrics.NewRegisteredMeter(key, PeerEgressRegistry) + c.lock.Unlock() + } + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerConnected, + IP: c.ip, + ID: id, + Elapsed: time.Since(c.connected), + }) +} + +// Close delegates a close operation to the underlying connection, unregisters +// the peer from the traffic registries and emits close event. +func (c *meteredConn) Close() error { + err := c.Conn.Close() + c.lock.RLock() + if c.id == "" { + // If the peer disconnects before the handshake. + c.lock.RUnlock() + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerHandshakeFailed, + IP: c.ip, + Elapsed: time.Since(c.connected), + }) + return err + } + id := c.id + if !c.trafficMetered { + // If the peer isn't registered in the traffic registries. + c.lock.RUnlock() + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerDisconnected, + IP: c.ip, + ID: id, + }) + return err + } + ingress, egress := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()) + c.lock.RUnlock() + + // Decrement the metered peer count + atomic.AddInt32(&meteredPeerCount, -1) + + // Unregister the peer from the traffic registries + key := fmt.Sprintf("%s/%s", c.ip, id) + PeerIngressRegistry.Unregister(key) + PeerEgressRegistry.Unregister(key) + + meteredPeerFeed.Send(MeteredPeerEvent{ + Type: PeerDisconnected, + IP: c.ip, + ID: id, + Ingress: ingress, + Egress: egress, + }) + return err } diff --git a/p2p/server.go b/p2p/server.go index 6482c0401..38a881f7b 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -864,7 +864,11 @@ func (srv *Server) listenLoop() { } } - fd = newMeteredConn(fd, true) + var ip net.IP + if tcp, ok := fd.RemoteAddr().(*net.TCPAddr); ok { + ip = tcp.IP + } + fd = newMeteredConn(fd, true, ip) srv.log.Trace("Accepted connection", "addr", fd.RemoteAddr()) go func() { srv.SetupConn(fd, inboundConn, nil) @@ -917,6 +921,9 @@ func (srv *Server) setupConn(c *conn, flags connFlag, dialDest *enode.Node) erro } else { c.node = nodeFromConn(remotePubkey, c.fd) } + if conn, ok := c.fd.(*meteredConn); ok { + conn.handshakeDone(c.node.ID()) + } clog := srv.log.New("id", c.node.ID(), "addr", c.fd.RemoteAddr(), "conn", c.flags) err = srv.checkpoint(c, srv.posthandshake) if err != nil {