Merge pull request #1301 from filecoin-project/feat/message-filtering
implement basic message filtering
This commit is contained in:
commit
b9c51e359f
@ -153,9 +153,32 @@ func (brc *blockReceiptCache) add(bcid cid.Cid) int {
|
|||||||
return val.(int)
|
return val.(int)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MessageValidator struct {
|
||||||
|
mpool *messagepool.MessagePool
|
||||||
|
}
|
||||||
|
|
||||||
|
func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
|
||||||
|
return &MessageValidator{mp}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
|
||||||
|
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("failed to decode incoming message: %s", err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := mv.mpool.Add(m); err != nil {
|
||||||
|
log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool, msub *pubsub.Subscription) {
|
||||||
for {
|
for {
|
||||||
msg, err := msub.Next(ctx)
|
_, err := msub.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Warn("error from message subscription: ", err)
|
log.Warn("error from message subscription: ", err)
|
||||||
if ctx.Err() != nil {
|
if ctx.Err() != nil {
|
||||||
@ -165,15 +188,6 @@ func HandleIncomingMessages(ctx context.Context, mpool *messagepool.MessagePool,
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
m, ok := msg.ValidatorData.(*types.SignedMessage)
|
// Do nothing... everything happens in validate
|
||||||
if !ok {
|
|
||||||
log.Errorf("message validator func passed on wrong type: %#v", msg.ValidatorData)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := mpool.Add(m); err != nil {
|
|
||||||
log.Warnf("failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s", m.Message.From, m.Message.To, m.Message.Nonce, types.FIL(m.Message.Value), err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
1
go.mod
1
go.mod
@ -84,7 +84,6 @@ require (
|
|||||||
github.com/multiformats/go-varint v0.0.2
|
github.com/multiformats/go-varint v0.0.2
|
||||||
github.com/opentracing/opentracing-go v1.1.0
|
github.com/opentracing/opentracing-go v1.1.0
|
||||||
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
|
github.com/polydawn/refmt v0.0.0-20190809202753-05966cbd336a
|
||||||
github.com/prometheus/client_golang v0.9.3-0.20190127221311-3c4408c8b829
|
|
||||||
github.com/prometheus/common v0.2.0
|
github.com/prometheus/common v0.2.0
|
||||||
github.com/stretchr/testify v1.4.0
|
github.com/stretchr/testify v1.4.0
|
||||||
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
|
github.com/whyrusleeping/bencher v0.0.0-20190829221104-bb6607aa8bba
|
||||||
|
@ -16,7 +16,6 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/blocksync"
|
"github.com/filecoin-project/lotus/chain/blocksync"
|
||||||
"github.com/filecoin-project/lotus/chain/messagepool"
|
"github.com/filecoin-project/lotus/chain/messagepool"
|
||||||
"github.com/filecoin-project/lotus/chain/sub"
|
"github.com/filecoin-project/lotus/chain/sub"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
|
||||||
"github.com/filecoin-project/lotus/node/hello"
|
"github.com/filecoin-project/lotus/node/hello"
|
||||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
"github.com/filecoin-project/lotus/node/modules/helpers"
|
"github.com/filecoin-project/lotus/node/modules/helpers"
|
||||||
@ -78,18 +77,9 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := func(ctx context.Context, pid peer.ID, msg *pubsub.Message) bool {
|
v := sub.NewMessageValidator(mpool)
|
||||||
m, err := types.DecodeSignedMessage(msg.GetData())
|
|
||||||
if err != nil {
|
|
||||||
log.Errorf("got incorrectly formatted Message: %s", err)
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
msg.ValidatorData = m
|
if err := ps.RegisterTopicValidator(MessagesTopic, v.Validate); err != nil {
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(MessagesTopic, v); err != nil {
|
|
||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user