forked from cerc-io/plugeth
p2p: measure subprotocol bandwidth usage
This commit is contained in:
parent
df89233b57
commit
a2a60869c8
@ -39,9 +39,13 @@ import (
|
|||||||
// separate Msg with a bytes.Reader as Payload for each send.
|
// separate Msg with a bytes.Reader as Payload for each send.
|
||||||
type Msg struct {
|
type Msg struct {
|
||||||
Code uint64
|
Code uint64
|
||||||
Size uint32 // size of the paylod
|
Size uint32 // Size of the raw payload
|
||||||
Payload io.Reader
|
Payload io.Reader
|
||||||
ReceivedAt time.Time
|
ReceivedAt time.Time
|
||||||
|
|
||||||
|
meterCap Cap // Protocol name and version for egress metering
|
||||||
|
meterCode uint64 // Message within protocol for egress metering
|
||||||
|
meterSize uint32 // Compressed message size for ingress metering
|
||||||
}
|
}
|
||||||
|
|
||||||
// Decode parses the RLP content of a message into
|
// Decode parses the RLP content of a message into
|
||||||
|
@ -28,6 +28,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common/mclock"
|
"github.com/ethereum/go-ethereum/common/mclock"
|
||||||
"github.com/ethereum/go-ethereum/event"
|
"github.com/ethereum/go-ethereum/event"
|
||||||
"github.com/ethereum/go-ethereum/log"
|
"github.com/ethereum/go-ethereum/log"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enode"
|
"github.com/ethereum/go-ethereum/p2p/enode"
|
||||||
"github.com/ethereum/go-ethereum/p2p/enr"
|
"github.com/ethereum/go-ethereum/p2p/enr"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
@ -300,6 +301,9 @@ func (p *Peer) handle(msg Msg) error {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return fmt.Errorf("msg code out of range: %v", msg.Code)
|
return fmt.Errorf("msg code out of range: %v", msg.Code)
|
||||||
}
|
}
|
||||||
|
if metrics.Enabled {
|
||||||
|
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsInboundTraffic, proto.Name, proto.Version, msg.Code-proto.offset), nil).Mark(int64(msg.meterSize))
|
||||||
|
}
|
||||||
select {
|
select {
|
||||||
case proto.in <- msg:
|
case proto.in <- msg:
|
||||||
return nil
|
return nil
|
||||||
@ -398,7 +402,11 @@ func (rw *protoRW) WriteMsg(msg Msg) (err error) {
|
|||||||
if msg.Code >= rw.Length {
|
if msg.Code >= rw.Length {
|
||||||
return newPeerError(errInvalidMsgCode, "not handled")
|
return newPeerError(errInvalidMsgCode, "not handled")
|
||||||
}
|
}
|
||||||
|
msg.meterCap = rw.cap()
|
||||||
|
msg.meterCode = msg.Code
|
||||||
|
|
||||||
msg.Code += rw.offset
|
msg.Code += rw.offset
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case <-rw.wstart:
|
case <-rw.wstart:
|
||||||
err = rw.w.WriteMsg(msg)
|
err = rw.w.WriteMsg(msg)
|
||||||
|
@ -38,6 +38,7 @@ import (
|
|||||||
"github.com/ethereum/go-ethereum/common/bitutil"
|
"github.com/ethereum/go-ethereum/common/bitutil"
|
||||||
"github.com/ethereum/go-ethereum/crypto"
|
"github.com/ethereum/go-ethereum/crypto"
|
||||||
"github.com/ethereum/go-ethereum/crypto/ecies"
|
"github.com/ethereum/go-ethereum/crypto/ecies"
|
||||||
|
"github.com/ethereum/go-ethereum/metrics"
|
||||||
"github.com/ethereum/go-ethereum/rlp"
|
"github.com/ethereum/go-ethereum/rlp"
|
||||||
"github.com/golang/snappy"
|
"github.com/golang/snappy"
|
||||||
"golang.org/x/crypto/sha3"
|
"golang.org/x/crypto/sha3"
|
||||||
@ -602,6 +603,10 @@ func (rw *rlpxFrameRW) WriteMsg(msg Msg) error {
|
|||||||
msg.Payload = bytes.NewReader(payload)
|
msg.Payload = bytes.NewReader(payload)
|
||||||
msg.Size = uint32(len(payload))
|
msg.Size = uint32(len(payload))
|
||||||
}
|
}
|
||||||
|
msg.meterSize = msg.Size
|
||||||
|
if metrics.Enabled && msg.meterCap.Name != "" { // don't meter non-subprotocol messages
|
||||||
|
metrics.GetOrRegisterMeter(fmt.Sprintf("%s/%s/%d/%#02x", MetricsOutboundTraffic, msg.meterCap.Name, msg.meterCap.Version, msg.meterCode), nil).Mark(int64(msg.meterSize))
|
||||||
|
}
|
||||||
// write header
|
// write header
|
||||||
headbuf := make([]byte, 32)
|
headbuf := make([]byte, 32)
|
||||||
fsize := uint32(len(ptype)) + msg.Size
|
fsize := uint32(len(ptype)) + msg.Size
|
||||||
@ -686,6 +691,7 @@ func (rw *rlpxFrameRW) ReadMsg() (msg Msg, err error) {
|
|||||||
return msg, err
|
return msg, err
|
||||||
}
|
}
|
||||||
msg.Size = uint32(content.Len())
|
msg.Size = uint32(content.Len())
|
||||||
|
msg.meterSize = msg.Size
|
||||||
msg.Payload = content
|
msg.Payload = content
|
||||||
|
|
||||||
// if snappy is enabled, verify and decompress message
|
// if snappy is enabled, verify and decompress message
|
||||||
|
Loading…
Reference in New Issue
Block a user