fix clown shoes pubsub validation: we always accept our own self-published messages
This commit is contained in:
parent
9a23ede4fd
commit
884d4ad9df
@ -195,6 +195,8 @@ func fetchCids(
|
|||||||
}
|
}
|
||||||
|
|
||||||
type BlockValidator struct {
|
type BlockValidator struct {
|
||||||
|
self peer.ID
|
||||||
|
|
||||||
peers *lru.TwoQueueCache
|
peers *lru.TwoQueueCache
|
||||||
|
|
||||||
killThresh int
|
killThresh int
|
||||||
@ -211,9 +213,10 @@ type BlockValidator struct {
|
|||||||
keycache map[string]address.Address
|
keycache map[string]address.Address
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewBlockValidator(chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
func NewBlockValidator(self peer.ID, chain *store.ChainStore, stmgr *stmgr.StateManager, blacklist func(peer.ID)) *BlockValidator {
|
||||||
p, _ := lru.New2Q(4096)
|
p, _ := lru.New2Q(4096)
|
||||||
return &BlockValidator{
|
return &BlockValidator{
|
||||||
|
self: self,
|
||||||
peers: p,
|
peers: p,
|
||||||
killThresh: 10,
|
killThresh: 10,
|
||||||
blacklist: blacklist,
|
blacklist: blacklist,
|
||||||
@ -243,6 +246,10 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
func (bv *BlockValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
if pid == bv.self {
|
||||||
|
return pubsub.ValidationAccept
|
||||||
|
}
|
||||||
|
|
||||||
// track validation time
|
// track validation time
|
||||||
begin := build.Clock.Now()
|
begin := build.Clock.Now()
|
||||||
defer func() {
|
defer func() {
|
||||||
@ -485,14 +492,19 @@ func (brc *blockReceiptCache) add(bcid cid.Cid) int {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type MessageValidator struct {
|
type MessageValidator struct {
|
||||||
|
self peer.ID
|
||||||
mpool *messagepool.MessagePool
|
mpool *messagepool.MessagePool
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewMessageValidator(mp *messagepool.MessagePool) *MessageValidator {
|
func NewMessageValidator(self peer.ID, mp *messagepool.MessagePool) *MessageValidator {
|
||||||
return &MessageValidator{mp}
|
return &MessageValidator{self: self, mpool: mp}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
func (mv *MessageValidator) Validate(ctx context.Context, pid peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
|
||||||
|
if pid == mv.self {
|
||||||
|
return pubsub.ValidationAccept
|
||||||
|
}
|
||||||
|
|
||||||
stats.Record(ctx, metrics.MessageReceived.M(1))
|
stats.Record(ctx, metrics.MessageReceived.M(1))
|
||||||
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
m, err := types.DecodeSignedMessage(msg.Message.GetData())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
@ -82,7 +82,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
}
|
}
|
||||||
|
|
||||||
v := sub.NewBlockValidator(
|
v := sub.NewBlockValidator(
|
||||||
chain, stmgr,
|
h.ID(), chain, stmgr,
|
||||||
func(p peer.ID) {
|
func(p peer.ID) {
|
||||||
ps.BlacklistPeer(p)
|
ps.BlacklistPeer(p)
|
||||||
h.ConnManager().TagPeer(p, "badblock", -1000)
|
h.ConnManager().TagPeer(p, "badblock", -1000)
|
||||||
@ -95,7 +95,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, bserv, h.ConnManager())
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, h host.Host, nn dtypes.NetworkName) {
|
||||||
ctx := helpers.LifecycleCtx(mctx, lc)
|
ctx := helpers.LifecycleCtx(mctx, lc)
|
||||||
|
|
||||||
msgsub, err := ps.Subscribe(build.MessagesTopic(nn))
|
msgsub, err := ps.Subscribe(build.MessagesTopic(nn))
|
||||||
@ -103,7 +103,7 @@ func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
v := sub.NewMessageValidator(mpool)
|
v := sub.NewMessageValidator(h.ID(), mpool)
|
||||||
|
|
||||||
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
if err := ps.RegisterTopicValidator(build.MessagesTopic(nn), v.Validate); err != nil {
|
||||||
panic(err)
|
panic(err)
|
||||||
|
Loading…
Reference in New Issue
Block a user