flag block source when messages cannot be retrieved
This commit is contained in:
parent
5824714cdc
commit
4e5179557d
@ -24,7 +24,7 @@ import (
|
|||||||
|
|
||||||
var log = logging.Logger("sub")
|
var log = logging.Logger("sub")
|
||||||
|
|
||||||
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager) {
|
func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *chain.Syncer, cmgr connmgr.ConnManager, bv *BlockValidator) {
|
||||||
for {
|
for {
|
||||||
msg, err := bsub.Next(ctx)
|
msg, err := bsub.Next(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -42,6 +42,8 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
src := peer.ID(msg.GetFrom())
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
log.Infof("New block over pubsub: %s", blk.Cid())
|
log.Infof("New block over pubsub: %s", blk.Cid())
|
||||||
|
|
||||||
@ -49,13 +51,15 @@ func HandleIncomingBlocks(ctx context.Context, bsub *pubsub.Subscription, s *cha
|
|||||||
log.Debug("about to fetch messages for block from pubsub")
|
log.Debug("about to fetch messages for block from pubsub")
|
||||||
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
|
bmsgs, err := s.Bsync.FetchMessagesByCids(context.TODO(), blk.BlsMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s", err)
|
log.Errorf("failed to fetch all bls messages for block received over pubusb: %s; flagging source %s", err, src)
|
||||||
|
bv.flagPeer(src)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
|
smsgs, err := s.Bsync.FetchSignedMessagesByCids(context.TODO(), blk.SecpkMessages)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s", err)
|
log.Errorf("failed to fetch all secpk messages for block received over pubusb: %s; flagging source %s", err, src)
|
||||||
|
bv.flagPeer(src)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -90,7 +94,7 @@ func NewBlockValidator(blacklist func(peer.ID)) *BlockValidator {
|
|||||||
p, _ := lru.New2Q(4096)
|
p, _ := lru.New2Q(4096)
|
||||||
return &BlockValidator{
|
return &BlockValidator{
|
||||||
peers: p,
|
peers: p,
|
||||||
killThresh: 5,
|
killThresh: 10,
|
||||||
blacklist: blacklist,
|
blacklist: blacklist,
|
||||||
recvBlocks: newBlockReceiptCache(),
|
recvBlocks: newBlockReceiptCache(),
|
||||||
}
|
}
|
||||||
@ -106,7 +110,9 @@ func (bv *BlockValidator) flagPeer(p peer.ID) {
|
|||||||
val := v.(int)
|
val := v.(int)
|
||||||
|
|
||||||
if val >= bv.killThresh {
|
if val >= bv.killThresh {
|
||||||
|
log.Warnf("blacklisting peer %s", p)
|
||||||
bv.blacklist(p)
|
bv.blacklist(p)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
bv.peers.Add(p, v.(int)+1)
|
bv.peers.Add(p, v.(int)+1)
|
||||||
|
@ -75,7 +75,7 @@ func HandleIncomingBlocks(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.P
|
|||||||
panic(err)
|
panic(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager())
|
go sub.HandleIncomingBlocks(ctx, blocksub, s, h.ConnManager(), v)
|
||||||
}
|
}
|
||||||
|
|
||||||
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
|
func HandleIncomingMessages(mctx helpers.MetricsCtx, lc fx.Lifecycle, ps *pubsub.PubSub, mpool *messagepool.MessagePool, nn dtypes.NetworkName) {
|
||||||
|
Loading…
Reference in New Issue
Block a user