2019-07-08 14:07:09 +00:00
package sub
import (
"context"
2019-12-07 10:49:05 +00:00
"time"
2019-09-18 02:50:03 +00:00
2020-04-01 01:34:23 +00:00
"golang.org/x/xerrors"
2020-02-17 05:51:18 +00:00
lru "github.com/hashicorp/golang-lru"
"github.com/ipfs/go-cid"
2020-01-08 19:10:57 +00:00
logging "github.com/ipfs/go-log/v2"
2019-12-17 07:06:48 +00:00
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
2020-05-05 14:12:06 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-08 14:07:09 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2020-03-02 00:26:09 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
2019-07-08 15:14:36 +00:00
2020-01-07 20:41:26 +00:00
"github.com/filecoin-project/lotus/build"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-03-02 00:26:09 +00:00
"github.com/filecoin-project/lotus/metrics"
2019-07-08 14:07:09 +00:00
)
var log = logging . Logger ( "sub" )
2020-05-05 13:49:43 +00:00
func HandleIncomingBlocks ( ctx context . Context , bsub * pubsub . Subscription , s * chain . Syncer , cmgr connmgr . ConnManager , bv * BlockValidator ) {
2019-07-08 14:07:09 +00:00
for {
msg , err := bsub . Next ( ctx )
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingBlocks loop" )
return
}
2019-07-28 05:35:32 +00:00
log . Error ( "error from block subscription: " , err )
2019-07-08 14:07:09 +00:00
continue
}
2020-02-17 05:51:18 +00:00
blk , ok := msg . ValidatorData . ( * types . BlockMsg )
if ! ok {
log . Warnf ( "pubsub block validator passed on wrong type: %#v" , msg . ValidatorData )
2020-05-05 13:35:03 +00:00
return
2020-01-07 20:41:26 +00:00
}
2020-05-05 14:12:06 +00:00
//nolint:golint
2020-05-05 13:49:43 +00:00
src := peer . ID ( msg . GetFrom ( ) )
2019-07-08 14:07:09 +00:00
go func ( ) {
2019-12-11 20:41:24 +00:00
log . Infof ( "New block over pubsub: %s" , blk . Cid ( ) )
start := time . Now ( )
2019-08-27 18:45:21 +00:00
log . Debug ( "about to fetch messages for block from pubsub" )
2019-08-01 20:40:47 +00:00
bmsgs , err := s . Bsync . FetchMessagesByCids ( context . TODO ( ) , blk . BlsMessages )
2019-07-08 14:07:09 +00:00
if err != nil {
2020-05-05 13:49:43 +00:00
log . Errorf ( "failed to fetch all bls messages for block received over pubusb: %s; flagging source %s" , err , src )
bv . flagPeer ( src )
2019-07-08 14:07:09 +00:00
return
}
2019-08-01 20:40:47 +00:00
smsgs , err := s . Bsync . FetchSignedMessagesByCids ( context . TODO ( ) , blk . SecpkMessages )
if err != nil {
2020-05-05 13:49:43 +00:00
log . Errorf ( "failed to fetch all secpk messages for block received over pubusb: %s; flagging source %s" , err , src )
bv . flagPeer ( src )
2019-08-01 20:40:47 +00:00
return
}
2019-12-11 20:41:24 +00:00
took := time . Since ( start )
log . Infow ( "new block over pubsub" , "cid" , blk . Header . Cid ( ) , "source" , msg . GetFrom ( ) , "msgfetch" , took )
2019-12-07 10:49:05 +00:00
if delay := time . Now ( ) . Unix ( ) - int64 ( blk . Header . Timestamp ) ; delay > 5 {
log . Warnf ( "Received block with large delay %d from miner %s" , delay , blk . Header . Miner )
}
2019-12-17 07:06:48 +00:00
2019-12-19 06:19:15 +00:00
if s . InformNewBlock ( msg . ReceivedFrom , & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : blk . Header ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-12-17 07:06:48 +00:00
} ) {
2019-12-19 06:19:15 +00:00
cmgr . TagPeer ( msg . ReceivedFrom , "blkprop" , 5 )
2019-12-17 07:06:48 +00:00
}
2019-07-08 14:07:09 +00:00
} ( )
}
}
2020-02-17 05:51:18 +00:00
type BlockValidator struct {
peers * lru . TwoQueueCache
killThresh int
recvBlocks * blockReceiptCache
blacklist func ( peer . ID )
}
func NewBlockValidator ( blacklist func ( peer . ID ) ) * BlockValidator {
p , _ := lru . New2Q ( 4096 )
return & BlockValidator {
peers : p ,
2020-05-05 13:49:43 +00:00
killThresh : 10 ,
2020-02-17 05:51:18 +00:00
blacklist : blacklist ,
recvBlocks : newBlockReceiptCache ( ) ,
}
}
func ( bv * BlockValidator ) flagPeer ( p peer . ID ) {
v , ok := bv . peers . Get ( p )
if ! ok {
bv . peers . Add ( p , int ( 1 ) )
return
}
val := v . ( int )
if val >= bv . killThresh {
2020-05-05 13:49:43 +00:00
log . Warnf ( "blacklisting peer %s" , p )
2020-02-17 05:51:18 +00:00
bv . blacklist ( p )
2020-05-05 13:49:43 +00:00
return
2020-02-17 05:51:18 +00:00
}
bv . peers . Add ( p , v . ( int ) + 1 )
}
2020-05-05 13:35:03 +00:00
func ( bv * BlockValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . BlockReceived . M ( 1 ) )
2020-02-17 05:51:18 +00:00
blk , err := types . DecodeBlockMsg ( msg . GetData ( ) )
if err != nil {
log . Error ( "got invalid block over pubsub: " , err )
2020-03-02 00:57:16 +00:00
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , "invalid" ) )
stats . Record ( ctx , metrics . BlockValidationFailure . M ( 1 ) )
2020-02-17 05:51:18 +00:00
bv . flagPeer ( pid )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-17 05:51:18 +00:00
}
if len ( blk . BlsMessages ) + len ( blk . SecpkMessages ) > build . BlockMessageLimit {
log . Warnf ( "received block with too many messages over pubsub" )
2020-03-02 00:57:16 +00:00
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , "too_many_messages" ) )
stats . Record ( ctx , metrics . BlockValidationFailure . M ( 1 ) )
2020-02-17 05:51:18 +00:00
bv . flagPeer ( pid )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-17 05:51:18 +00:00
}
if bv . recvBlocks . add ( blk . Header . Cid ( ) ) > 0 {
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
2020-05-05 13:35:03 +00:00
return pubsub . ValidationIgnore
2020-02-17 05:51:18 +00:00
}
msg . ValidatorData = blk
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . BlockValidationSuccess . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationAccept
2020-02-17 05:51:18 +00:00
}
type blockReceiptCache struct {
blocks * lru . TwoQueueCache
}
func newBlockReceiptCache ( ) * blockReceiptCache {
c , _ := lru . New2Q ( 8192 )
return & blockReceiptCache {
blocks : c ,
}
}
func ( brc * blockReceiptCache ) add ( bcid cid . Cid ) int {
val , ok := brc . blocks . Get ( bcid )
if ! ok {
brc . blocks . Add ( bcid , int ( 1 ) )
return 0
}
brc . blocks . Add ( bcid , val . ( int ) + 1 )
return val . ( int )
}
2020-02-28 01:39:07 +00:00
type MessageValidator struct {
mpool * messagepool . MessagePool
}
func NewMessageValidator ( mp * messagepool . MessagePool ) * MessageValidator {
return & MessageValidator { mp }
}
2020-05-05 13:35:03 +00:00
func ( mv * MessageValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageReceived . M ( 1 ) )
2020-02-28 01:39:07 +00:00
m , err := types . DecodeSignedMessage ( msg . Message . GetData ( ) )
if err != nil {
log . Warnf ( "failed to decode incoming message: %s" , err )
2020-03-02 00:57:16 +00:00
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , "decode" ) )
stats . Record ( ctx , metrics . MessageValidationFailure . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-28 01:39:07 +00:00
}
if err := mv . mpool . Add ( m ) ; err != nil {
2020-03-01 04:15:02 +00:00
log . Debugf ( "failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s" , m . Message . From , m . Message . To , m . Message . Nonce , types . FIL ( m . Message . Value ) , err )
2020-03-02 00:26:09 +00:00
ctx , _ = tag . New (
ctx ,
2020-03-02 00:57:16 +00:00
tag . Insert ( metrics . FailureType , "add" ) ,
2020-03-02 00:26:09 +00:00
)
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageValidationFailure . M ( 1 ) )
2020-05-05 13:35:03 +00:00
if xerrors . Is ( err , messagepool . ErrBroadcastAnyway ) {
return pubsub . ValidationAccept
}
2020-05-05 14:12:06 +00:00
return pubsub . ValidationIgnore
2020-02-28 01:39:07 +00:00
}
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageValidationSuccess . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationAccept
2020-02-28 01:39:07 +00:00
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages ( ctx context . Context , mpool * messagepool . MessagePool , msub * pubsub . Subscription ) {
2019-07-08 14:07:09 +00:00
for {
2020-02-28 01:39:07 +00:00
_ , err := msub . Next ( ctx )
2019-07-08 14:07:09 +00:00
if err != nil {
2019-09-17 14:23:08 +00:00
log . Warn ( "error from message subscription: " , err )
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingMessages loop" )
return
}
2019-07-08 14:07:09 +00:00
continue
}
2020-02-28 01:39:07 +00:00
// Do nothing... everything happens in validate
2019-07-08 14:07:09 +00:00
}
}