diff --git a/p2p/message.go b/p2p/message.go index b98773222..10b55a939 100644 --- a/p2p/message.go +++ b/p2p/message.go @@ -39,9 +39,13 @@ import ( // separate Msg with a bytes.Reader as Payload for each send. type Msg struct { Code uint64 - Size uint32 // size of the paylod + Size uint32 // Size of the raw payload Payload io.Reader ReceivedAt time.Time + + meterCap Cap // Protocol name and version for egress metering + meterCode uint64 // Message within protocol for egress metering + meterSize uint32 // Compressed message size for ingress metering } // Decode parses the RLP content of a message into diff --git a/p2p/peer.go b/p2p/peer.go index 372ba8d02..9a9788bc1 100644 --- a/p2p/peer.go +++ b/p2p/peer.go @@ -28,6 +28,7 @@ import ( "github.com/ethereum/go-ethereum/common/mclock" "github.com/ethereum/go-ethereum/event" "github.com/ethereum/go-ethereum/log" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/p2p/enode" "github.com/ethereum/go-ethereum/p2p/enr" "github.com/ethereum/go-ethereum/rlp" @@ -300,6 +301,9 @@ func (p *Peer) handle(msg Msg) error { if err != nil { return fmt.Errorf("msg code out of range: %v", msg.Code) } + if metrics.Enabled { + metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize)) + } select { case proto.in <- msg: return nil @@ -398,7 +402,11 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) { if msg.Code >= rw.Length { return newPeerError(errInvalidMsgCode, "not handled") } + msg.meterCap = rw.cap() + msg.meterCode = msg.Code + msg.Code += rw.offset + select { case <-rw.wstart: err = rw.w.WriteMsg(msg) diff --git a/p2p/rlpx.go b/p2p/rlpx.go index 52e1eb8a4..115021fa9 100644 --- a/p2p/rlpx.go +++ b/p2p/rlpx.go @@ -38,6 +38,7 @@ import ( "github.com/ethereum/go-ethereum/common/bitutil" "github.com/ethereum/go-ethereum/crypto" "github.com/ethereum/go-ethereum/crypto/ecies" + "github.com/ethereum/go-ethereum/metrics" "github.com/ethereum/go-ethereum/rlp" "github.com/golang/snappy" "golang.org/x/crypto/sha3" @@ -602,6 +603,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error { msg.Payload = bytes.NewReader(payload) msg.Size = uint32(len(payload)) } + msg.meterSize = msg.Size + if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages + metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize)) + } // write header headbuf := make([]byte, 32) fsize := uint32(len(ptype)) + msg.Size @@ -686,6 +691,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) { return msg, err } msg.Size = uint32(content.Len()) + msg.meterSize = msg.Size msg.Payload = content // if snappy is enabled, verify and decompress message