From 884d4ad9dfcfe1dc30e8ecde4db75f71437c0b12 Mon Sep 17 00:00:00 2001 From: vyzo Date: Sun, 16 Aug 2020 20:46:19 +0300 Subject: [PATCH] fix clown shoes pubsub validation: we always accept our own self-published messages --- chain/sub/incoming.go | 18 +++++++++++++++--- node/modules/services.go | 6 +++--- 2 files changed, 18 insertions(+), 6 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 75f23cd3b..fb00e4ae1 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -195,6 +195,8 @@ func fetchCids( } type BlockValidator struct { + self peer.ID + peers *lru.TwoQueueCache killThresh int @@ -211,9 +213,10 @@ type BlockValidator struct { keycache map[string]address.Address } -func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator { +func NewBlockValidator(self peer.ID, chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator { p, _ := lru.New2Q(4096) return &BlockValidator{ + self: self, peers: p, killThresh: 10, blacklist: blacklist, @@ -243,6 +246,10 @@ func (bv *BlockValidator) flagPeer(p peer.ID) { } func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + if pid == bv.self { + return pubsub.ValidationAccept + } + // track validation time begin := build.Clock.Now() defer func() { @@ -485,14 +492,19 @@ func (brc *blockReceiptCache) add(bcid cid.Cid) int { } type MessageValidator struct { + self peer.ID mpool *messagepool.MessagePool } -func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { - return &MessageValidator{mp} +func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator { + return &MessageValidator{self: self, mpool: mp} } func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult { + if pid == mv.self { + return pubsub.ValidationAccept + } + stats.Record(ctx, metrics.MessageReceived.M(1)) m, err := types.DecodeSignedMessage(msg.Message.GetData()) if err != nil { diff --git a/node/modules/services.go b/node/modules/services.go index 0d148ffb4..013a6c0af 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -82,7 +82,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P } v := sub.NewBlockValidator( - chain, stmgr, + h.ID(), chain, stmgr, func(p peer.ID) { ps.BlacklistPeer(p) h.ConnManager().TagPeer(p, "badblock", -1000) @@ -95,7 +95,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager()) } -func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) { +func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) { ctx := helpers.LifecycleCtx(mctx, lc) msgsub, err := ps.Subscribe(build.MessagesTopic(nn)) @@ -103,7 +103,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub panic(err) } - v := sub.NewMessageValidator(mpool) + v := sub.NewMessageValidator(h.ID(), mpool) if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil { panic(err)