From 00efd097c7729f13e5c286bc7ba5778bf1082bbd Mon Sep 17 00:00:00 2001 From: whyrusleeping Date: Thu, 27 Feb 2020 17:39:07 -0800 Subject: [PATCH] implement basic message filtering --- chain/sub/incoming.go | 36 +++++++++++++++++++++++++----------- go.mod | 1 - node/modules/services.go | 14 ++------------ 3 files changed, 27 insertions(+), 24 deletions(-) diff --git a/chain/sub/incoming.go b/chain/sub/incoming.go index 6ee41aabf..b4bd8b3fa 100644 --- a/chain/sub/incoming.go +++ b/chain/sub/incoming.go @@ -153,9 +153,32 @@ func (brc *blockReceiptCache) add(bcid cid.Cid) int { return val.(int) } +type MessageValidator struct { + mpool *messagepool.MessagePool +} + +func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator { + return &MessageValidator{mp} +} + +func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { + m, err := types.DecodeSignedMessage(msg.Message.GetData()) + if err != nil { + log.Warnf("failed to decode incoming message: %s", err) + return false + } + + if err := mv.mpool.Add(m); err != nil { + log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) + return false + } + + return true +} + func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) { for { - msg, err := msub.Next(ctx) + _, err := msub.Next(ctx) if err != nil { log.Warn("error from message subscription: ", err) if ctx.Err() != nil { @@ -165,15 +188,6 @@ func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, continue } - m, ok := msg.ValidatorData.(*types.SignedMessage) - if !ok { - log.Errorf("message validator func passed on wrong type: %#v", msg.ValidatorData) - continue - } - - if err := mpool.Add(m); err != nil { - log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err) - continue - } + // Do nothing... everything happens in validate } } diff --git a/go.mod b/go.mod index 052fb4878..00076cee5 100644 --- a/go.mod +++ b/go.mod @@ -84,7 +84,6 @@ require ( github.com/multiformats/go-varint v0.0.2 github.com/opentracing/opentracing-go v1.1.0 github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a - github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829 github.com/prometheus/common v0.2.0 github.com/stretchr/testify v1.4.0 github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba diff --git a/node/modules/services.go b/node/modules/services.go index 0ae6d98f0..7207ec149 100644 --- a/node/modules/services.go +++ b/node/modules/services.go @@ -16,7 +16,6 @@ import ( "github.com/filecoin-project/lotus/chain/blocksync" "github.com/filecoin-project/lotus/chain/messagepool" "github.com/filecoin-project/lotus/chain/sub" - "github.com/filecoin-project/lotus/chain/types" "github.com/filecoin-project/lotus/node/hello" "github.com/filecoin-project/lotus/node/modules/dtypes" "github.com/filecoin-project/lotus/node/modules/helpers" @@ -78,18 +77,9 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub panic(err) } - v := func(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool { - m, err := types.DecodeSignedMessage(msg.GetData()) - if err != nil { - log.Errorf("got incorrectly formatted Message: %s", err) - return false - } + v := sub.NewMessageValidator(mpool) - msg.ValidatorData = m - return true - } - - if err := ps.RegisterTopicValidator(MessagesTopic, v); err != nil { + if err := ps.RegisterTopicValidator(MessagesTopic, v.Validate); err != nil { panic(err) }