Merge pull request #3090 from filecoin-project/fix/pubsub-validation-clown-shoes
Fix clown shoes in pubsub validation
This commit is contained in:
commit
ff7f0a9dcd
@ -195,6 +195,8 @@ func fetchCids(
|
|||||||
}
|
}
|
||||||
|
|
||||||
type BlockValidator struct {
|
type BlockValidator struct {
|
||||||
|
self peer.ID
|
||||||
|
|
||||||
peers *lru.TwoQueueCache
|
peers *lru.TwoQueueCache
|
||||||
|
|
||||||
killThresh int
|
killThresh int
|
||||||
@ -211,9 +213,10 @@ type BlockValidator struct {
|
|||||||
keycache map[string]address.Address
|
keycache map[string]address.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
func NewBlockValidator(self peer.ID, chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
||||||
p, _ := lru.New2Q(4096)
|
p, _ := lru.New2Q(4096)
|
||||||
return &BlockValidator{
|
return &BlockValidator{
|
||||||
|
self: self,
|
||||||
peers: p,
|
peers: p,
|
||||||
killThresh: 10,
|
killThresh: 10,
|
||||||
blacklist: blacklist,
|
blacklist: blacklist,
|
||||||
@ -243,6 +246,10 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
if pid == bv.self {
|
||||||
|
return bv.validateLocalBlock(ctx, msg)
|
||||||
|
}
|
||||||
|
|
||||||
// track validation time
|
// track validation time
|
||||||
begin := build.Clock.Now()
|
begin := build.Clock.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -257,25 +264,10 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
|||||||
bv.flagPeer(pid)
|
bv.flagPeer(pid)
|
||||||
}
|
}
|
||||||
|
|
||||||
// make sure the block can be decoded
|
blk, what, err := bv.decodeAndCheckBlock(msg)
|
||||||
blk, err := types.DecodeBlockMsg(msg.GetData())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("got invalid block over pubsub: ", err)
|
log.Error("got invalid block over pubsub: ", err)
|
||||||
recordFailure("invalid")
|
recordFailure(what)
|
||||||
return pubsub.ValidationReject
|
|
||||||
}
|
|
||||||
|
|
||||||
// check the message limit constraints
|
|
||||||
if len(blk.BlsMessages)+len(blk.SecpkMessages) > build.BlockMessageLimit {
|
|
||||||
log.Warnf("received block with too many messages over pubsub")
|
|
||||||
recordFailure("too_many_messages")
|
|
||||||
return pubsub.ValidationReject
|
|
||||||
}
|
|
||||||
|
|
||||||
// make sure we have a signature
|
|
||||||
if blk.Header.BlockSig == nil {
|
|
||||||
log.Warnf("received block without a signature over pubsub")
|
|
||||||
recordFailure("missing_signature")
|
|
||||||
return pubsub.ValidationReject
|
return pubsub.ValidationReject
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -332,6 +324,45 @@ func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub
|
|||||||
return pubsub.ValidationAccept
|
return pubsub.ValidationAccept
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bv *BlockValidator) validateLocalBlock(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
stats.Record(ctx, metrics.BlockPublished.M(1))
|
||||||
|
|
||||||
|
blk, what, err := bv.decodeAndCheckBlock(msg)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("got invalid local block: %s", err)
|
||||||
|
ctx, _ = tag.New(ctx, tag.Insert(metrics.FailureType, what))
|
||||||
|
stats.Record(ctx, metrics.BlockValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
if count := bv.recvBlocks.add(blk.Header.Cid()); count > 0 {
|
||||||
|
log.Warnf("local block has been seen %d times; ignoring", count)
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
msg.ValidatorData = blk
|
||||||
|
stats.Record(ctx, metrics.BlockValidationSuccess.M(1))
|
||||||
|
return pubsub.ValidationAccept
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bv *BlockValidator) decodeAndCheckBlock(msg *pubsub.Message) (*types.BlockMsg, string, error) {
|
||||||
|
blk, err := types.DecodeBlockMsg(msg.GetData())
|
||||||
|
if err != nil {
|
||||||
|
return nil, "invalid", xerrors.Errorf("error decoding block: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if count := len(blk.BlsMessages) + len(blk.SecpkMessages); count > build.BlockMessageLimit {
|
||||||
|
return nil, "too_many_messages", fmt.Errorf("block contains too many messages (%d)", count)
|
||||||
|
}
|
||||||
|
|
||||||
|
// make sure we have a signature
|
||||||
|
if blk.Header.BlockSig == nil {
|
||||||
|
return nil, "missing_signature", fmt.Errorf("block without a signature")
|
||||||
|
}
|
||||||
|
|
||||||
|
return blk, "", nil
|
||||||
|
}
|
||||||
|
|
||||||
func (bv *BlockValidator) isChainNearSynced() bool {
|
func (bv *BlockValidator) isChainNearSynced() bool {
|
||||||
ts := bv.chain.GetHeaviestTipSet()
|
ts := bv.chain.GetHeaviestTipSet()
|
||||||
timestamp := ts.MinTimestamp()
|
timestamp := ts.MinTimestamp()
|
||||||
@ -485,14 +516,19 @@ func (brc *blockReceiptCache) add(bcid cid.Cid) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MessageValidator struct {
|
type MessageValidator struct {
|
||||||
|
self peer.ID
|
||||||
mpool *messagepool.MessagePool
|
mpool *messagepool.MessagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
|
func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator {
|
||||||
return &MessageValidator{mp}
|
return &MessageValidator{self: self, mpool: mp}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
if pid == mv.self {
|
||||||
|
return mv.validateLocalMessage(ctx, msg)
|
||||||
|
}
|
||||||
|
|
||||||
stats.Record(ctx, metrics.MessageReceived.M(1))
|
stats.Record(ctx, metrics.MessageReceived.M(1))
|
||||||
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -520,6 +556,45 @@ func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubs
|
|||||||
return pubsub.ValidationAccept
|
return pubsub.ValidationAccept
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (mv *MessageValidator) validateLocalMessage(ctx context.Context, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
// do some lightweight validation
|
||||||
|
stats.Record(ctx, metrics.MessagePublished.M(1))
|
||||||
|
|
||||||
|
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to decode local message: %s", err)
|
||||||
|
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Size() > 32*1024 {
|
||||||
|
log.Warnf("local message is too large! (%dB)", m.Size())
|
||||||
|
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Message.To == address.Undef {
|
||||||
|
log.Warn("local message has invalid destination address")
|
||||||
|
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
if !m.Message.Value.LessThan(types.TotalFilecoinInt) {
|
||||||
|
log.Warnf("local messages has too high value: %s", m.Message.Value)
|
||||||
|
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mv.mpool.VerifyMsgSig(m); err != nil {
|
||||||
|
log.Warnf("signature verification failed for local message: %s", err)
|
||||||
|
stats.Record(ctx, metrics.MessageValidationFailure.M(1))
|
||||||
|
return pubsub.ValidationIgnore
|
||||||
|
}
|
||||||
|
|
||||||
|
stats.Record(ctx, metrics.MessageValidationSuccess.M(1))
|
||||||
|
return pubsub.ValidationAccept
|
||||||
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
||||||
for {
|
for {
|
||||||
_, err := msub.Next(ctx)
|
_, err := msub.Next(ctx)
|
||||||
|
@ -30,9 +30,11 @@ var (
|
|||||||
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
|
LotusInfo = stats.Int64("info", "Arbitrary counter to tag lotus info to", stats.UnitDimensionless)
|
||||||
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
|
ChainNodeHeight = stats.Int64("chain/node_height", "Current Height of the node", stats.UnitDimensionless)
|
||||||
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
|
ChainNodeWorkerHeight = stats.Int64("chain/node_worker_height", "Current Height of workers on the node", stats.UnitDimensionless)
|
||||||
|
MessagePublished = stats.Int64("message/pubished", "Counter for total locally published messages", stats.UnitDimensionless)
|
||||||
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
MessageReceived = stats.Int64("message/received", "Counter for total received messages", stats.UnitDimensionless)
|
||||||
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
MessageValidationFailure = stats.Int64("message/failure", "Counter for message validation failures", stats.UnitDimensionless)
|
||||||
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
MessageValidationSuccess = stats.Int64("message/success", "Counter for message validation successes", stats.UnitDimensionless)
|
||||||
|
BlockPublished = stats.Int64("block/published", "Counter for total locally published blocks", stats.UnitDimensionless)
|
||||||
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
BlockReceived = stats.Int64("block/received", "Counter for total received blocks", stats.UnitDimensionless)
|
||||||
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
BlockValidationFailure = stats.Int64("block/failure", "Counter for block validation failures", stats.UnitDimensionless)
|
||||||
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
|
BlockValidationSuccess = stats.Int64("block/success", "Counter for block validation successes", stats.UnitDimensionless)
|
||||||
|
@ -82,7 +82,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
v := sub.NewBlockValidator(
|
v := sub.NewBlockValidator(
|
||||||
chain, stmgr,
|
h.ID(), chain, stmgr,
|
||||||
func(p peer.ID) {
|
func(p peer.ID) {
|
||||||
ps.BlacklistPeer(p)
|
ps.BlacklistPeer(p)
|
||||||
h.ConnManager().TagPeer(p, "badblock", -1000)
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
||||||
@ -95,7 +95,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn))
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn))
|
||||||
@ -103,7 +103,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := sub.NewMessageValidator(mpool)
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user