add some lightweight validation of local messages
This commit is contained in:
parent
884d4ad9df
commit
ae88a99c84
@ -247,7 +247,7 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
||||
|
||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||
if pid == bv.self {
|
||||
return pubsub.ValidationAccept
|
||||
return bv.validateLocalBlock(ctx, msg)
|
||||
}
|
||||
|
||||
// track validation time
|
||||
@ -339,6 +339,42 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
||||
func (bv *BlockValidator) validateLocalBlock(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
|
||||
stats.Record(ctx, metrics.BlockPublished.M(1))
|
||||
|
||||
// do some lightweight validation for local blocks
|
||||
blk, err := types.DecodeBlockMsg(msg.GetData())
|
||||
if err != nil {
|
||||
log.Warnf("error decoding local block: %s", err)
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
|
||||
log.Warnf("local block with too many messages: %d", len(blk.BlsMessages)+len(blk.SecpkMessages))
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
// make sure we have a signature
|
||||
if blk.Header.BlockSig == nil {
|
||||
log.Warn("local block without a signature")
|
||||
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
// Note we don't actually validate that signature as this is a slow process
|
||||
|
||||
if count := bv.recvBlocks.add(blk.Header.Cid()); count > 0 {
|
||||
log.Warnf("local block has been seen %d times; ignoring", count)
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
msg.ValidatorData = blk
|
||||
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
||||
func (bv *BlockValidator) isChainNearSynced() bool {
|
||||
ts := bv.chain.GetHeaviestTipSet()
|
||||
timestamp := ts.MinTimestamp()
|
||||
@ -502,7 +538,7 @@ func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageVali
|
||||
|
||||
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||
if pid == mv.self {
|
||||
return pubsub.ValidationAccept
|
||||
return mv.validateLocalMessage(ctx, msg)
|
||||
}
|
||||
|
||||
stats.Record(ctx, metrics.MessageReceived.M(1))
|
||||
@ -532,6 +568,45 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
||||
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
|
||||
// do some lightweight validation
|
||||
stats.Record(ctx, metrics.MessagePublished.M(1))
|
||||
|
||||
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||
if err != nil {
|
||||
log.Warnf("failed to decode local message: %s", err)
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
if m.Size() > 32*1024 {
|
||||
log.Warnf("local message is too large! (%dB)", m.Size())
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
if m.Message.To == address.Undef {
|
||||
log.Warn("local message has invalid destination address")
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
|
||||
log.Warnf("local messages has too high value: %s", m.Message.Value)
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
if err := mv.mpool.VerifyMsgSig(m); err != nil {
|
||||
log.Warnf("signature verification failed for local message: %s", err)
|
||||
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||
return pubsub.ValidationIgnore
|
||||
}
|
||||
|
||||
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||
return pubsub.ValidationAccept
|
||||
}
|
||||
|
||||
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
||||
for {
|
||||
_, err := msub.Next(ctx)
|
||||
|
@ -30,9 +30,11 @@ var (
|
||||
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
|
||||
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
|
||||
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
|
||||
MessagePublished = stats.Int64("message/pubished", "Counter for total locally published messages", stats.UnitDimensionless)
|
||||
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
||||
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
||||
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
||||
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
|
||||
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
||||
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
||||
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
|
||||
|
Loading…
Reference in New Issue
Block a user