Merge pull request #3372 from filecoin-project/fix/distinguish-local-msg-errors

distinguish local message validation failures from remote
This commit is contained in:
Łukasz Magiera 2020-08-28 12:07:52 +02:00 committed by GitHub
commit 7aed16ce4c
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
2 changed files with 69 additions and 17 deletions

View File

@ -262,16 +262,15 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
stats.Record(ctx, metrics.BlockReceived.M(1)) stats.Record(ctx, metrics.BlockReceived.M(1))
recordFailure := func(what string) { recordFailureFlagPeer := func(what string) {
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what)) recordFailure(ctx, metrics.BlockValidationFailure, what)
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid) bv.flagPeer(pid)
} }
blk, what, err := bv.decodeAndCheckBlock(msg) blk, what, err := bv.decodeAndCheckBlock(msg)
if err != nil { if err != nil {
log.Error("got invalid block over pubsub: ", err) log.Error("got invalid block over pubsub: ", err)
recordFailure(what) recordFailureFlagPeer(what)
return pubsub.ValidationReject return pubsub.ValidationReject
} }
@ -279,7 +278,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
err = bv.validateMsgMeta(ctx, blk) err = bv.validateMsgMeta(ctx, blk)
if err != nil { if err != nil {
log.Warnf("error validating message metadata: %s", err) log.Warnf("error validating message metadata: %s", err)
recordFailure("invalid_block_meta") recordFailureFlagPeer("invalid_block_meta")
return pubsub.ValidationReject return pubsub.ValidationReject
} }
@ -294,7 +293,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
if err != nil { if err != nil {
if err != ErrSoftFailure && bv.isChainNearSynced() { if err != ErrSoftFailure && bv.isChainNearSynced() {
log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message") log.Warnf("received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message")
recordFailure("unknown_miner") recordFailureFlagPeer("unknown_miner")
return pubsub.ValidationReject return pubsub.ValidationReject
} }
@ -305,13 +304,13 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
err = sigs.CheckBlockSignature(ctx, blk.Header, key) err = sigs.CheckBlockSignature(ctx, blk.Header, key)
if err != nil { if err != nil {
log.Errorf("block signature verification failed: %s", err) log.Errorf("block signature verification failed: %s", err)
recordFailure("signature_verification_failed") recordFailureFlagPeer("signature_verification_failed")
return pubsub.ValidationReject return pubsub.ValidationReject
} }
if blk.Header.ElectionProof.WinCount < 1 { if blk.Header.ElectionProof.WinCount < 1 {
log.Errorf("block is not claiming to be winning") log.Errorf("block is not claiming to be winning")
recordFailure("not_winning") recordFailureFlagPeer("not_winning")
return pubsub.ValidationReject return pubsub.ValidationReject
} }
@ -546,9 +545,9 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) log.Debugf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
ctx, _ = tag.New( ctx, _ = tag.New(
ctx, ctx,
tag.Insert(metrics.FailureType, "add"), tag.Upsert(metrics.Local, "false"),
) )
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "add")
switch { switch {
case xerrors.Is(err, messagepool.ErrBroadcastAnyway): case xerrors.Is(err, messagepool.ErrBroadcastAnyway):
fallthrough fallthrough
@ -565,37 +564,41 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
} }
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult { func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.Local, "true"),
)
// do some lightweight validation // do some lightweight validation
stats.Record(ctx, metrics.MessagePublished.M(1)) stats.Record(ctx, metrics.MessagePublished.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData()) m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil { if err != nil {
log.Warnf("failed to decode local message: %s", err) log.Warnf("failed to decode local message: %s", err)
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "decode")
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
} }
if m.Size() > 32*1024 { if m.Size() > 32*1024 {
log.Warnf("local message is too large! (%dB)", m.Size()) log.Warnf("local message is too large! (%dB)", m.Size())
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "oversize")
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
} }
if m.Message.To == address.Undef { if m.Message.To == address.Undef {
log.Warn("local message has invalid destination address") log.Warn("local message has invalid destination address")
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "undef-addr")
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
} }
if !m.Message.Value.LessThan(types.TotalFilecoinInt) { if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
log.Warnf("local messages has too high value: %s", m.Message.Value) log.Warnf("local messages has too high value: %s", m.Message.Value)
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "value-too-high")
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
} }
if err := mv.mpool.VerifyMsgSig(m); err != nil { if err := mv.mpool.VerifyMsgSig(m); err != nil {
log.Warnf("signature verification failed for local message: %s", err) log.Warnf("signature verification failed for local message: %s", err)
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) recordFailure(ctx, metrics.MessageValidationFailure, "verify-sig")
return pubsub.ValidationIgnore return pubsub.ValidationIgnore
} }
@ -618,3 +621,11 @@ func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool,
// Do nothing... everything happens in validate // Do nothing... everything happens in validate
} }
} }
func recordFailure(ctx context.Context, metric *stats.Int64Measure, failureType string) {
ctx, _ = tag.New(
ctx,
tag.Upsert(metrics.FailureType, failureType),
)
stats.Record(ctx, metric.M(1))
}

View File

@ -19,6 +19,7 @@ var (
Commit, _ = tag.NewKey("commit") Commit, _ = tag.NewKey("commit")
PeerID, _ = tag.NewKey("peer_id") PeerID, _ = tag.NewKey("peer_id")
FailureType, _ = tag.NewKey("failure_type") FailureType, _ = tag.NewKey("failure_type")
Local, _ = tag.NewKey("local")
MessageFrom, _ = tag.NewKey("message_from") MessageFrom, _ = tag.NewKey("message_from")
MessageTo, _ = tag.NewKey("message_to") MessageTo, _ = tag.NewKey("message_to")
MessageNonce, _ = tag.NewKey("message_nonce") MessageNonce, _ = tag.NewKey("message_nonce")
@ -30,7 +31,7 @@ var (
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless) LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless) ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless) ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
MessagePublished = stats.Int64("message/pubished", "Counter for total locally published messages", stats.UnitDimensionless) MessagePublished = stats.Int64("message/published", "Counter for total locally published messages", stats.UnitDimensionless)
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless) MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless) MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless) MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
@ -82,6 +83,10 @@ var (
Measure: BlockValidationDurationMilliseconds, Measure: BlockValidationDurationMilliseconds,
Aggregation: defaultMillisecondsDistribution, Aggregation: defaultMillisecondsDistribution,
} }
MessagePublishedView = &view.View{
Measure: MessagePublished,
Aggregation: view.Count(),
}
MessageReceivedView = &view.View{ MessageReceivedView = &view.View{
Measure: MessageReceived, Measure: MessageReceived,
Aggregation: view.Count(), Aggregation: view.Count(),
@ -89,7 +94,7 @@ var (
MessageValidationFailureView = &view.View{ MessageValidationFailureView = &view.View{
Measure: MessageValidationFailure, Measure: MessageValidationFailure,
Aggregation: view.Count(), Aggregation: view.Count(),
TagKeys: []tag.Key{FailureType}, TagKeys: []tag.Key{FailureType, Local},
} }
MessageValidationSuccessView = &view.View{ MessageValidationSuccessView = &view.View{
Measure: MessageValidationSuccess, Measure: MessageValidationSuccess,
@ -99,6 +104,34 @@ var (
Measure: PeerCount, Measure: PeerCount,
Aggregation: view.LastValue(), Aggregation: view.LastValue(),
} }
PubsubPublishMessageView = &view.View{
Measure: PubsubPublishMessage,
Aggregation: view.Count(),
}
PubsubDeliverMessageView = &view.View{
Measure: PubsubDeliverMessage,
Aggregation: view.Count(),
}
PubsubRejectMessageView = &view.View{
Measure: PubsubRejectMessage,
Aggregation: view.Count(),
}
PubsubDuplicateMessageView = &view.View{
Measure: PubsubDuplicateMessage,
Aggregation: view.Count(),
}
PubsubRecvRPCView = &view.View{
Measure: PubsubRecvRPC,
Aggregation: view.Count(),
}
PubsubSendRPCView = &view.View{
Measure: PubsubSendRPC,
Aggregation: view.Count(),
}
PubsubDropRPCView = &view.View{
Measure: PubsubDropRPC,
Aggregation: view.Count(),
}
) )
// DefaultViews is an array of OpenCensus views for metric gathering purposes // DefaultViews is an array of OpenCensus views for metric gathering purposes
@ -110,10 +143,18 @@ var DefaultViews = append([]*view.View{
BlockValidationFailureView, BlockValidationFailureView,
BlockValidationSuccessView, BlockValidationSuccessView,
BlockValidationDurationView, BlockValidationDurationView,
MessagePublishedView,
MessageReceivedView, MessageReceivedView,
MessageValidationFailureView, MessageValidationFailureView,
MessageValidationSuccessView, MessageValidationSuccessView,
PeerCountView, PeerCountView,
PubsubPublishMessageView,
PubsubDeliverMessageView,
PubsubRejectMessageView,
PubsubDuplicateMessageView,
PubsubRecvRPCView,
PubsubSendRPCView,
PubsubDropRPCView,
}, },
rpcmetrics.DefaultViews...) rpcmetrics.DefaultViews...)