p2p: remove MeteredPeerEvent (#20679)
This event was added for the dashboard, but we don't need it anymore since the dashboard is gone.
This commit is contained in:
parent
26284ec3cc
commit
ac72787768
168
p2p/metrics.go
168
p2p/metrics.go
@ -20,102 +20,37 @@ package p2p
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"sync"
|
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/ethereum/go-ethereum/event"
|
|
||||||
"github.com/ethereum/go-ethereum/log"
|
|
||||||
"github.com/ethereum/go-ethereum/metrics"
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
MetricsInboundTraffic = "p2p/ingress" // Name for the registered inbound traffic meter
|
ingressMeterName = "p2p/ingress"
|
||||||
MetricsOutboundTraffic = "p2p/egress" // Name for the registered outbound traffic meter
|
egressMeterName = "p2p/egress"
|
||||||
MetricsOutboundConnects = "p2p/dials" // Name for the registered outbound connects meter
|
|
||||||
MetricsInboundConnects = "p2p/serves" // Name for the registered inbound connects meter
|
|
||||||
|
|
||||||
MeteredPeerLimit = 1024 // This amount of peers are individually metered
|
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
ingressConnectMeter = metrics.NewRegisteredMeter(MetricsInboundConnects, nil) // Meter counting the ingress connections
|
ingressConnectMeter = metrics.NewRegisteredMeter("p2p/serves", nil)
|
||||||
ingressTrafficMeter = metrics.NewRegisteredMeter(MetricsInboundTraffic, nil) // Meter metering the cumulative ingress traffic
|
ingressTrafficMeter = metrics.NewRegisteredMeter(ingressMeterName, nil)
|
||||||
egressConnectMeter = metrics.NewRegisteredMeter(MetricsOutboundConnects, nil) // Meter counting the egress connections
|
egressConnectMeter = metrics.NewRegisteredMeter("p2p/dials", nil)
|
||||||
egressTrafficMeter = metrics.NewRegisteredMeter(MetricsOutboundTraffic, nil) // Meter metering the cumulative egress traffic
|
egressTrafficMeter = metrics.NewRegisteredMeter(egressMeterName, nil)
|
||||||
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil) // Gauge tracking the current peer count
|
activePeerGauge = metrics.NewRegisteredGauge("p2p/peers", nil)
|
||||||
|
|
||||||
PeerIngressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsInboundTraffic+"/") // Registry containing the peer ingress
|
|
||||||
PeerEgressRegistry = metrics.NewPrefixedChildRegistry(metrics.EphemeralRegistry, MetricsOutboundTraffic+"/") // Registry containing the peer egress
|
|
||||||
|
|
||||||
meteredPeerFeed event.Feed // Event feed for peer metrics
|
|
||||||
meteredPeerCount int32 // Actually stored peer connection count
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// MeteredPeerEventType is the type of peer events emitted by a metered connection.
|
|
||||||
type MeteredPeerEventType int
|
|
||||||
|
|
||||||
const (
|
|
||||||
// PeerHandshakeSucceeded is the type of event
|
|
||||||
// emitted when a peer successfully makes the handshake.
|
|
||||||
PeerHandshakeSucceeded MeteredPeerEventType = iota
|
|
||||||
|
|
||||||
// PeerHandshakeFailed is the type of event emitted when a peer fails to
|
|
||||||
// make the handshake or disconnects before it.
|
|
||||||
PeerHandshakeFailed
|
|
||||||
|
|
||||||
// PeerDisconnected is the type of event emitted when a peer disconnects.
|
|
||||||
PeerDisconnected
|
|
||||||
)
|
|
||||||
|
|
||||||
// MeteredPeerEvent is an event emitted when peers connect or disconnect.
|
|
||||||
type MeteredPeerEvent struct {
|
|
||||||
Type MeteredPeerEventType // Type of peer event
|
|
||||||
Addr string // TCP address of the peer
|
|
||||||
Elapsed time.Duration // Time elapsed between the connection and the handshake/disconnection
|
|
||||||
Peer *Peer // Connected remote node instance
|
|
||||||
Ingress uint64 // Ingress count at the moment of the event
|
|
||||||
Egress uint64 // Egress count at the moment of the event
|
|
||||||
}
|
|
||||||
|
|
||||||
// SubscribeMeteredPeerEvent registers a subscription for peer life-cycle events
|
|
||||||
// if metrics collection is enabled.
|
|
||||||
func SubscribeMeteredPeerEvent(ch chan<- MeteredPeerEvent) event.Subscription {
|
|
||||||
return meteredPeerFeed.Subscribe(ch)
|
|
||||||
}
|
|
||||||
|
|
||||||
// meteredConn is a wrapper around a net.Conn that meters both the
|
// meteredConn is a wrapper around a net.Conn that meters both the
|
||||||
// inbound and outbound network traffic.
|
// inbound and outbound network traffic.
|
||||||
type meteredConn struct {
|
type meteredConn struct {
|
||||||
net.Conn // Network connection to wrap with metering
|
net.Conn
|
||||||
|
|
||||||
connected time.Time // Connection time of the peer
|
|
||||||
addr *net.TCPAddr // TCP address of the peer
|
|
||||||
peer *Peer // Peer instance
|
|
||||||
|
|
||||||
// trafficMetered denotes if the peer is registered in the traffic registries.
|
|
||||||
// Its value is true if the metered peer count doesn't reach the limit in the
|
|
||||||
// moment of the peer's connection.
|
|
||||||
trafficMetered bool
|
|
||||||
ingressMeter metrics.Meter // Meter for the read bytes of the peer
|
|
||||||
egressMeter metrics.Meter // Meter for the written bytes of the peer
|
|
||||||
|
|
||||||
lock sync.RWMutex // Lock protecting the metered connection's internals
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// newMeteredConn creates a new metered connection, bumps the ingress or egress
|
// newMeteredConn creates a new metered connection, bumps the ingress or egress
|
||||||
// connection meter and also increases the metered peer count. If the metrics
|
// connection meter and also increases the metered peer count. If the metrics
|
||||||
// system is disabled or the IP address is unspecified, this function returns
|
// system is disabled, function returns the original connection.
|
||||||
// the original object.
|
|
||||||
func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
|
func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
|
||||||
// Short circuit if metrics are disabled
|
// Short circuit if metrics are disabled
|
||||||
if !metrics.Enabled {
|
if !metrics.Enabled {
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
if addr == nil || addr.IP.IsUnspecified() {
|
|
||||||
log.Warn("Peer address is unspecified")
|
|
||||||
return conn
|
|
||||||
}
|
|
||||||
// Bump the connection counters and wrap the connection
|
// Bump the connection counters and wrap the connection
|
||||||
if ingress {
|
if ingress {
|
||||||
ingressConnectMeter.Mark(1)
|
ingressConnectMeter.Mark(1)
|
||||||
@ -123,12 +58,7 @@ func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
|
|||||||
egressConnectMeter.Mark(1)
|
egressConnectMeter.Mark(1)
|
||||||
}
|
}
|
||||||
activePeerGauge.Inc(1)
|
activePeerGauge.Inc(1)
|
||||||
|
return &meteredConn{Conn: conn}
|
||||||
return &meteredConn{
|
|
||||||
Conn: conn,
|
|
||||||
addr: addr,
|
|
||||||
connected: time.Now(),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Read delegates a network read to the underlying connection, bumping the common
|
// Read delegates a network read to the underlying connection, bumping the common
|
||||||
@ -136,11 +66,6 @@ func newMeteredConn(conn net.Conn, ingress bool, addr *net.TCPAddr) net.Conn {
|
|||||||
func (c *meteredConn) Read(b []byte) (n int, err error) {
|
func (c *meteredConn) Read(b []byte) (n int, err error) {
|
||||||
n, err = c.Conn.Read(b)
|
n, err = c.Conn.Read(b)
|
||||||
ingressTrafficMeter.Mark(int64(n))
|
ingressTrafficMeter.Mark(int64(n))
|
||||||
c.lock.RLock()
|
|
||||||
if c.trafficMetered {
|
|
||||||
c.ingressMeter.Mark(int64(n))
|
|
||||||
}
|
|
||||||
c.lock.RUnlock()
|
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -149,84 +74,15 @@ func (c *meteredConn) Read(b []byte) (n int, err error) {
|
|||||||
func (c *meteredConn) Write(b []byte) (n int, err error) {
|
func (c *meteredConn) Write(b []byte) (n int, err error) {
|
||||||
n, err = c.Conn.Write(b)
|
n, err = c.Conn.Write(b)
|
||||||
egressTrafficMeter.Mark(int64(n))
|
egressTrafficMeter.Mark(int64(n))
|
||||||
c.lock.RLock()
|
|
||||||
if c.trafficMetered {
|
|
||||||
c.egressMeter.Mark(int64(n))
|
|
||||||
}
|
|
||||||
c.lock.RUnlock()
|
|
||||||
return n, err
|
return n, err
|
||||||
}
|
}
|
||||||
|
|
||||||
// handshakeDone is called after the connection passes the handshake.
|
|
||||||
func (c *meteredConn) handshakeDone(peer *Peer) {
|
|
||||||
if atomic.AddInt32(&meteredPeerCount, 1) >= MeteredPeerLimit {
|
|
||||||
// Don't register the peer in the traffic registries.
|
|
||||||
atomic.AddInt32(&meteredPeerCount, -1)
|
|
||||||
c.lock.Lock()
|
|
||||||
c.peer, c.trafficMetered = peer, false
|
|
||||||
c.lock.Unlock()
|
|
||||||
log.Warn("Metered peer count reached the limit")
|
|
||||||
} else {
|
|
||||||
enode := peer.Node().String()
|
|
||||||
c.lock.Lock()
|
|
||||||
c.peer, c.trafficMetered = peer, true
|
|
||||||
c.ingressMeter = metrics.NewRegisteredMeter(enode, PeerIngressRegistry)
|
|
||||||
c.egressMeter = metrics.NewRegisteredMeter(enode, PeerEgressRegistry)
|
|
||||||
c.lock.Unlock()
|
|
||||||
}
|
|
||||||
meteredPeerFeed.Send(MeteredPeerEvent{
|
|
||||||
Type: PeerHandshakeSucceeded,
|
|
||||||
Addr: c.addr.String(),
|
|
||||||
Peer: peer,
|
|
||||||
Elapsed: time.Since(c.connected),
|
|
||||||
})
|
|
||||||
}
|
|
||||||
|
|
||||||
// Close delegates a close operation to the underlying connection, unregisters
|
// Close delegates a close operation to the underlying connection, unregisters
|
||||||
// the peer from the traffic registries and emits close event.
|
// the peer from the traffic registries and emits close event.
|
||||||
func (c *meteredConn) Close() error {
|
func (c *meteredConn) Close() error {
|
||||||
err := c.Conn.Close()
|
err := c.Conn.Close()
|
||||||
c.lock.RLock()
|
if err == nil {
|
||||||
if c.peer == nil {
|
|
||||||
// If the peer disconnects before/during the handshake.
|
|
||||||
c.lock.RUnlock()
|
|
||||||
meteredPeerFeed.Send(MeteredPeerEvent{
|
|
||||||
Type: PeerHandshakeFailed,
|
|
||||||
Addr: c.addr.String(),
|
|
||||||
Elapsed: time.Since(c.connected),
|
|
||||||
})
|
|
||||||
activePeerGauge.Dec(1)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
peer := c.peer
|
|
||||||
if !c.trafficMetered {
|
|
||||||
// If the peer isn't registered in the traffic registries.
|
|
||||||
c.lock.RUnlock()
|
|
||||||
meteredPeerFeed.Send(MeteredPeerEvent{
|
|
||||||
Type: PeerDisconnected,
|
|
||||||
Addr: c.addr.String(),
|
|
||||||
Peer: peer,
|
|
||||||
})
|
|
||||||
activePeerGauge.Dec(1)
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
ingress, egress, enode := uint64(c.ingressMeter.Count()), uint64(c.egressMeter.Count()), c.peer.Node().String()
|
|
||||||
c.lock.RUnlock()
|
|
||||||
|
|
||||||
// Decrement the metered peer count
|
|
||||||
atomic.AddInt32(&meteredPeerCount, -1)
|
|
||||||
|
|
||||||
// Unregister the peer from the traffic registries
|
|
||||||
PeerIngressRegistry.Unregister(enode)
|
|
||||||
PeerEgressRegistry.Unregister(enode)
|
|
||||||
|
|
||||||
meteredPeerFeed.Send(MeteredPeerEvent{
|
|
||||||
Type: PeerDisconnected,
|
|
||||||
Addr: c.addr.String(),
|
|
||||||
Peer: peer,
|
|
||||||
Ingress: ingress,
|
|
||||||
Egress: egress,
|
|
||||||
})
|
|
||||||
activePeerGauge.Dec(1)
|
activePeerGauge.Dec(1)
|
||||||
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
@ -302,7 +302,8 @@ func (p *Peer) handle(msg Msg) error {
|
|||||||
return fmt.Errorf("msg code out of range: %v", msg.Code)
|
return fmt.Errorf("msg code out of range: %v", msg.Code)
|
||||||
}
|
}
|
||||||
if metrics.Enabled {
|
if metrics.Enabled {
|
||||||
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize))
|
m := fmt.Sprintf("%s/%s/%d/%#02x", ingressMeterName, proto.Name, proto.Version, msg.Code-proto.offset)
|
||||||
|
metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
|
||||||
}
|
}
|
||||||
select {
|
select {
|
||||||
case proto.in <- msg:
|
case proto.in <- msg:
|
||||||
|
@ -595,7 +595,8 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
|
|||||||
}
|
}
|
||||||
msg.meterSize = msg.Size
|
msg.meterSize = msg.Size
|
||||||
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
|
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
|
||||||
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize))
|
m := fmt.Sprintf("%s/%s/%d/%#02x", egressMeterName, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode)
|
||||||
|
metrics.GetOrRegisterMeter(m, nil).Mark(int64(msg.meterSize))
|
||||||
}
|
}
|
||||||
// write header
|
// write header
|
||||||
headbuf := make([]byte, 32)
|
headbuf := make([]byte, 32)
|
||||||
|
@ -755,9 +755,6 @@ running:
|
|||||||
peers[c.node.ID()] = p
|
peers[c.node.ID()] = p
|
||||||
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", truncateName(c.name))
|
srv.log.Debug("Adding p2p peer", "peercount", len(peers), "id", p.ID(), "conn", c.flags, "addr", p.RemoteAddr(), "name", truncateName(c.name))
|
||||||
srv.dialsched.peerAdded(c)
|
srv.dialsched.peerAdded(c)
|
||||||
if conn, ok := c.fd.(*meteredConn); ok {
|
|
||||||
conn.handshakeDone(p)
|
|
||||||
}
|
|
||||||
if p.Inbound() {
|
if p.Inbound() {
|
||||||
inboundCount++
|
inboundCount++
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user