From 5824714cdc1a5488f57db13b2ecfa7572f28e821 Mon Sep 17 00:00:00 2001 From: vyzo Date: Tue, 5 May 2020 16:35:03 +0300 Subject: [PATCH] use extended validator interface --- chain/sub/incoming.go | 23 ++++++++++++++--------- 1 file changed, 14 insertions(+), 9 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 1f994d317..92efec302 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -39,6 +39,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha blk, ok := msg.ValidatorData.(*types.BlockMsg) if !ok { log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData) + return } go func() { @@ -111,7 +112,7 @@ func (bv *BlockValidator) flagPeer(p peer.ID) { bv.peers.Add(p, v.(int)+1) } -func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { +func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { stats.Record(ctx, metrics.BlockReceived.M(1)) blk, err := types.DecodeBlockMsg(msg.GetData()) if err != nil { @@ -119,7 +120,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid")) stats.Record(ctx, metrics.BlockValidationFailure.M(1)) bv.flagPeer(pid) - return false + return pubsub.ValidationReject } if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit { @@ -127,18 +128,18 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages")) stats.Record(ctx, metrics.BlockValidationFailure.M(1)) bv.flagPeer(pid) - return false + return pubsub.ValidationReject } if bv.recvBlocks.add(blk.Header.Cid()) > 0 { // TODO: once these changes propagate to the network, we can consider // dropping peers who send us the same block multiple times - return false + return pubsub.ValidationIgnore } msg.ValidatorData = blk stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) - return true + return pubsub.ValidationAccept } type blockReceiptCache struct { @@ -172,14 +173,14 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { return &MessageValidator{mp} } -func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { +func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { stats.Record(ctx, metrics.MessageReceived.M(1)) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { log.Warnf("failed to decode incoming message: %s", err) ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode")) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) - return false + return pubsub.ValidationReject } if err := mv.mpool.Add(m); err != nil { @@ -189,10 +190,14 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs tag.Insert(metrics.FailureType, "add"), ) stats.Record(ctx, metrics.MessageValidationFailure.M(1)) - return xerrors.Is(err, messagepool.ErrBroadcastAnyway) + if xerrors.Is(err, messagepool.ErrBroadcastAnyway) { + return pubsub.ValidationAccept + } else { + return pubsub.ValidationIgnore + } } stats.Record(ctx, metrics.MessageValidationSuccess.M(1)) - return true + return pubsub.ValidationAccept } func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {