use extended validator interface

This commit is contained in:
vyzo 2020-05-05 16:35:03 +03:00
parent e3f6e586ee
commit 5824714cdc

View File

@ -39,6 +39,7 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
blk, ok := msg.ValidatorData.(*types.BlockMsg) blk, ok := msg.ValidatorData.(*types.BlockMsg)
if !ok { if !ok {
log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData) log.Warnf("pubsub block validator passed on wrong type: %#v", msg.ValidatorData)
return
} }
go func() { go func() {
@ -111,7 +112,7 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
bv.peers.Add(p, v.(int)+1) bv.peers.Add(p, v.(int)+1)
} }
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.BlockReceived.M(1)) stats.Record(ctx, metrics.BlockReceived.M(1))
blk, err := types.DecodeBlockMsg(msg.GetData()) blk, err := types.DecodeBlockMsg(msg.GetData())
if err != nil { if err != nil {
@ -119,7 +120,7 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid")) ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "invalid"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1)) stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid) bv.flagPeer(pid)
return false return pubsub.ValidationReject
} }
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit { if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
@ -127,18 +128,18 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages")) ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "too_many_messages"))
stats.Record(ctx, metrics.BlockValidationFailure.M(1)) stats.Record(ctx, metrics.BlockValidationFailure.M(1))
bv.flagPeer(pid) bv.flagPeer(pid)
return false return pubsub.ValidationReject
} }
if bv.recvBlocks.add(blk.Header.Cid()) > 0 { if bv.recvBlocks.add(blk.Header.Cid()) > 0 {
// TODO: once these changes propagate to the network, we can consider // TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times // dropping peers who send us the same block multiple times
return false return pubsub.ValidationIgnore
} }
msg.ValidatorData = blk msg.ValidatorData = blk
stats.Record(ctx, metrics.BlockValidationSuccess.M(1)) stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
return true return pubsub.ValidationAccept
} }
type blockReceiptCache struct { type blockReceiptCache struct {
@ -172,14 +173,14 @@ func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
return &MessageValidator{mp} return &MessageValidator{mp}
} }
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
stats.Record(ctx, metrics.MessageReceived.M(1)) stats.Record(ctx, metrics.MessageReceived.M(1))
m, err := types.DecodeSignedMessage(msg.Message.GetData()) m, err := types.DecodeSignedMessage(msg.Message.GetData())
if err != nil { if err != nil {
log.Warnf("failed to decode incoming message: %s", err) log.Warnf("failed to decode incoming message: %s", err)
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode")) ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, "decode"))
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return false return pubsub.ValidationReject
} }
if err := mv.mpool.Add(m); err != nil { if err := mv.mpool.Add(m); err != nil {
@ -189,10 +190,14 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
tag.Insert(metrics.FailureType, "add"), tag.Insert(metrics.FailureType, "add"),
) )
stats.Record(ctx, metrics.MessageValidationFailure.M(1)) stats.Record(ctx, metrics.MessageValidationFailure.M(1))
return xerrors.Is(err, messagepool.ErrBroadcastAnyway) if xerrors.Is(err, messagepool.ErrBroadcastAnyway) {
return pubsub.ValidationAccept
} else {
return pubsub.ValidationIgnore
}
} }
stats.Record(ctx, metrics.MessageValidationSuccess.M(1)) stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
return true return pubsub.ValidationAccept
} }
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) { func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {