Add block metrics to incoming pubsub validate funcs

This commit is contained in:
Nate Walck 2020-03-01 19:57:16 -05:00
parent 33af2409e8
commit 8283bb994a
2 changed files with 52 additions and 16 deletions

View File

@ -111,15 +111,25 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
} }
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
stats.Record(ctx, metrics.BlockReceived.M(1))
ctx, _ = tag.New(
ctx,
tag.Insert(metrics.PeerID, pid.String()),
tag.Insert(metrics.ReceivedFrom, msg.ReceivedFrom.String()),
)
blk, err := types.DecodeBlockMsg(msg.GetData()) blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil { if err != nil {
log.Error("got invalid block over pubsub: ", err) log.Error("got invalid block over pubsub: ", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid) bv.flagPeer(pid)
return false return false
} }
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit { if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
log.Warnf("received block with too many messages over pubsub") log.Warnf("received block with too many messages over pubsub")
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid) bv.flagPeer(pid)
return false return false
} }
@ -131,6 +141,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
} }
msg.ValidatorData = blk msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
return true return true
} }
@ -166,11 +177,13 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
} }
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
stats.Record(ctx, metrics.MessageReceived.M(1))
ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String())) ctx, _ = tag.New(ctx, tag.Insert(metrics.PeerID, pid.String()))
m, err := types.DecodeSignedMessage(msg.Message.GetData()) m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil { if err != nil {
log.Warnf("failed to decode incoming message: %s", err) log.Warnf("failed to decode incoming message: %s", err)
stats.Record(ctx, metrics.MessageDecodeFailure.M(1)) ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return false return false
} }
@ -181,11 +194,12 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
tag.Insert(metrics.MessageFrom, m.Message.From.String()), tag.Insert(metrics.MessageFrom, m.Message.From.String()),
tag.Insert(metrics.MessageTo, m.Message.To.String()), tag.Insert(metrics.MessageTo, m.Message.To.String()),
tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)), tag.Insert(metrics.MessageNonce, fmt.Sprint(m.Message.Nonce)),
tag.Insert(metrics.FailureType, "add"),
) )
stats.Record(ctx, metrics.MessageAddFailure.M(1)) stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return false return false
} }
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return true return true
} }

View File

@ -12,9 +12,11 @@ var (
Commit, _ = tag.NewKey("commit") Commit, _ = tag.NewKey("commit")
RPCMethod, _ = tag.NewKey("method") RPCMethod, _ = tag.NewKey("method")
PeerID, _ = tag.NewKey("peer_id") PeerID, _ = tag.NewKey("peer_id")
FailureType, _ = tag.NewKey("failure_type")
MessageFrom, _ = tag.NewKey("message_from") MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to") MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce") MessageNonce, _ = tag.NewKey("message_nonce")
ReceivedFrom, _ = tag.NewKey("received_from")
) )
// Measures // Measures
@ -22,8 +24,12 @@ var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
MessageAddFailure = stats.Int64("message/add_faliure", "Counter for messages that failed to be added", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageDecodeFailure = stats.Int64("message/decode_faliure", "Counter for messages that failed to be decoded", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless) PeerCount = stats.Int64("peer/count", "Current number of FIL peers", stats.UnitDimensionless)
RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless) RPCInvalidMethod = stats.Int64("rpc/invalid_method", "Total number of invalid RPC methods called", stats.UnitDimensionless)
RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless) RPCRequestError = stats.Int64("rpc/request_error", "Total number of request errors handled", stats.UnitDimensionless)
@ -48,14 +54,30 @@ var DefaultViews = []*view.View{
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
}, },
&view.View{ &view.View{
Measure: MessageAddFailure, Measure: BlockReceived,
Aggregation: view.Count(), Aggregation: view.Count(),
TagKeys: []tag.Key{MessageFrom, MessageTo, MessageNonce},
}, },
&view.View{ &view.View{
Measure: MessageDecodeFailure, Measure: BlockValidationFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{FailureType, PeerID},
},
&view.View{
Measure: BlockValidationSuccess,
Aggregation: view.Count(),
},
&view.View{
Measure: MessageReceived,
Aggregation: view.Count(),
},
&view.View{
Measure: MessageValidationFailure,
Aggregation: view.Count(),
TagKeys: []tag.Key{FailureType, MessageFrom, MessageTo, MessageNonce},
},
&view.View{
Measure: MessageValidationSuccess,
Aggregation: view.Count(), Aggregation: view.Count(),
TagKeys: []tag.Key{PeerID},
}, },
&view.View{ &view.View{
Measure: PeerCount, Measure: PeerCount,