2019-07-08 14:07:09 +00:00
package sub
import (
2022-02-10 00:40:27 +00:00
"bytes"
2019-07-08 14:07:09 +00:00
"context"
2022-02-10 00:21:05 +00:00
"encoding/binary"
2022-02-08 10:53:25 +00:00
"sync"
2020-09-08 07:39:16 +00:00
"time"
2019-09-18 02:50:03 +00:00
2022-06-14 15:00:51 +00:00
lru "github.com/hashicorp/golang-lru"
blocks "github.com/ipfs/go-block-format"
bserv "github.com/ipfs/go-blockservice"
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
2022-06-14 15:00:51 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
"golang.org/x/xerrors"
2022-06-15 10:06:22 +00:00
"github.com/filecoin-project/go-address"
2022-02-10 00:40:27 +00:00
"github.com/filecoin-project/go-legs/dtsync"
2022-06-14 15:00:51 +00:00
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain"
2021-09-02 16:07:23 +00:00
"github.com/filecoin-project/lotus/chain/consensus"
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
"github.com/filecoin-project/lotus/chain/store"
2022-02-08 10:53:25 +00:00
"github.com/filecoin-project/lotus/chain/sub/ratelimit"
2020-11-19 12:46:40 +00:00
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/metrics"
"github.com/filecoin-project/lotus/node/impl/client"
2022-02-08 12:55:59 +00:00
"github.com/filecoin-project/lotus/node/impl/full"
2019-07-08 14:07:09 +00:00
)
var log = logging . Logger ( "sub" )
2020-10-14 19:23:30 +00:00
var msgCidPrefix = cid . Prefix {
Version : 1 ,
Codec : cid . DagCBOR ,
MhType : client . DefaultHashFunction ,
MhLength : 32 ,
}
2020-10-02 17:19:26 +00:00
func HandleIncomingBlocks ( ctx context . Context , bsub * pubsub . Subscription , s * chain . Syncer , bs bserv . BlockService , cmgr connmgr . ConnManager ) {
// Timeout after (block time + propagation delay). This is useless at
// this point.
timeout := time . Duration ( build . BlockDelaySecs + build . PropagationDelaySecs ) * time . Second
2019-07-08 14:07:09 +00:00
for {
msg , err := bsub . Next ( ctx )
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingBlocks loop" )
return
}
2019-07-28 05:35:32 +00:00
log . Error ( "error from block subscription: " , err )
2019-07-08 14:07:09 +00:00
continue
}
2020-02-17 05:51:18 +00:00
blk , ok := msg . ValidatorData . ( * types . BlockMsg )
if ! ok {
log . Warnf ( "pubsub block validator passed on wrong type: %#v" , msg . ValidatorData )
2020-05-05 13:35:03 +00:00
return
2020-01-07 20:41:26 +00:00
}
2020-06-02 14:29:39 +00:00
src := msg . GetFrom ( )
2020-05-05 13:49:43 +00:00
2019-07-08 14:07:09 +00:00
go func ( ) {
2020-10-02 17:19:26 +00:00
ctx , cancel := context . WithTimeout ( ctx , timeout )
defer cancel ( )
// NOTE: we could also share a single session between
// all requests but that may have other consequences.
ses := bserv . NewSession ( ctx , bs )
2020-07-10 14:43:14 +00:00
start := build . Clock . Now ( )
2019-08-27 18:45:21 +00:00
log . Debug ( "about to fetch messages for block from pubsub" )
2020-10-02 17:19:26 +00:00
bmsgs , err := FetchMessagesByCids ( ctx , ses , blk . BlsMessages )
2019-07-08 14:07:09 +00:00
if err != nil {
2021-05-31 22:30:10 +00:00
log . Errorf ( "failed to fetch all bls messages for block received over pubsub: %s; source: %s" , err , src )
2019-07-08 14:07:09 +00:00
return
}
2019-08-01 20:40:47 +00:00
2020-10-02 17:19:26 +00:00
smsgs , err := FetchSignedMessagesByCids ( ctx , ses , blk . SecpkMessages )
2019-08-01 20:40:47 +00:00
if err != nil {
2021-05-31 22:30:10 +00:00
log . Errorf ( "failed to fetch all secpk messages for block received over pubsub: %s; source: %s" , err , src )
2019-08-01 20:40:47 +00:00
return
}
2020-07-10 14:43:14 +00:00
took := build . Clock . Since ( start )
2020-11-03 12:28:31 +00:00
log . Debugw ( "new block over pubsub" , "cid" , blk . Header . Cid ( ) , "source" , msg . GetFrom ( ) , "msgfetch" , took )
if took > 3 * time . Second {
log . Warnw ( "Slow msg fetch" , "cid" , blk . Header . Cid ( ) , "source" , msg . GetFrom ( ) , "msgfetch" , took )
}
2020-07-10 14:43:14 +00:00
if delay := build . Clock . Now ( ) . Unix ( ) - int64 ( blk . Header . Timestamp ) ; delay > 5 {
2020-12-10 14:48:37 +00:00
_ = stats . RecordWithTags ( ctx ,
[ ] tag . Mutator { tag . Insert ( metrics . MinerID , blk . Header . Miner . String ( ) ) } ,
metrics . BlockDelay . M ( delay ) ,
)
2021-03-05 06:09:07 +00:00
log . Warnw ( "received block with large delay from miner" , "block" , blk . Cid ( ) , "delay" , delay , "miner" , blk . Header . Miner )
2019-12-07 10:49:05 +00:00
}
2019-12-17 07:06:48 +00:00
2019-12-19 06:19:15 +00:00
if s . InformNewBlock ( msg . ReceivedFrom , & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : blk . Header ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-12-17 07:06:48 +00:00
} ) {
2019-12-19 06:19:15 +00:00
cmgr . TagPeer ( msg . ReceivedFrom , "blkprop" , 5 )
2019-12-17 07:06:48 +00:00
}
2019-07-08 14:07:09 +00:00
} ( )
}
}
2020-07-27 15:31:36 +00:00
func FetchMessagesByCids (
ctx context . Context ,
2020-10-02 17:19:26 +00:00
bserv bserv . BlockGetter ,
2020-07-27 15:31:36 +00:00
cids [ ] cid . Cid ,
) ( [ ] * types . Message , error ) {
out := make ( [ ] * types . Message , len ( cids ) )
err := fetchCids ( ctx , bserv , cids , func ( i int , b blocks . Block ) error {
msg , err := types . DecodeMessage ( b . RawData ( ) )
if err != nil {
return err
}
out [ i ] = msg
return nil
} )
if err != nil {
return nil , err
}
return out , nil
}
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids (
ctx context . Context ,
2020-10-02 17:19:26 +00:00
bserv bserv . BlockGetter ,
2020-07-27 15:31:36 +00:00
cids [ ] cid . Cid ,
) ( [ ] * types . SignedMessage , error ) {
out := make ( [ ] * types . SignedMessage , len ( cids ) )
err := fetchCids ( ctx , bserv , cids , func ( i int , b blocks . Block ) error {
smsg , err := types . DecodeSignedMessage ( b . RawData ( ) )
if err != nil {
return err
}
out [ i ] = smsg
return nil
} )
if err != nil {
return nil , err
}
return out , nil
}
// Fetch `cids` from the block service, apply `cb` on each of them. Used
// by the fetch message functions above.
// We check that each block is received only once and we do not received
// blocks we did not request.
func fetchCids (
ctx context . Context ,
2020-10-02 17:19:26 +00:00
bserv bserv . BlockGetter ,
2020-07-27 15:31:36 +00:00
cids [ ] cid . Cid ,
cb func ( int , blocks . Block ) error ,
) error {
2020-10-08 20:49:36 +00:00
ctx , cancel := context . WithCancel ( ctx )
defer cancel ( )
2020-07-27 15:31:36 +00:00
cidIndex := make ( map [ cid . Cid ] int )
for i , c := range cids {
2020-10-14 19:23:30 +00:00
if c . Prefix ( ) != msgCidPrefix {
2022-02-08 10:53:25 +00:00
return xerrors . Errorf ( "invalid msg CID: %s" , c )
2020-10-14 19:23:30 +00:00
}
2020-07-27 15:31:36 +00:00
cidIndex [ c ] = i
}
2020-10-08 20:49:36 +00:00
if len ( cids ) != len ( cidIndex ) {
2022-02-08 10:53:25 +00:00
return xerrors . Errorf ( "duplicate CIDs in fetchCids input" )
2020-10-08 20:49:36 +00:00
}
2020-10-09 18:06:49 +00:00
for block := range bserv . GetBlocks ( ctx , cids ) {
ix , ok := cidIndex [ block . Cid ( ) ]
if ! ok {
// Ignore duplicate/unexpected blocks. This shouldn't
// happen, but we can be safe.
log . Errorw ( "received duplicate/unexpected block when syncing" , "cid" , block . Cid ( ) )
continue
}
2020-07-27 15:31:36 +00:00
2020-10-09 18:06:49 +00:00
// Record that we've received the block.
delete ( cidIndex , block . Cid ( ) )
2020-07-27 15:31:36 +00:00
2020-10-09 18:06:49 +00:00
if err := cb ( ix , block ) ; err != nil {
return err
}
}
2020-07-27 15:31:36 +00:00
2020-10-09 18:06:49 +00:00
if len ( cidIndex ) > 0 {
err := ctx . Err ( )
if err == nil {
2022-02-08 10:53:25 +00:00
err = xerrors . Errorf ( "failed to fetch %d messages for unknown reasons" , len ( cidIndex ) )
2020-07-27 15:31:36 +00:00
}
2020-10-09 18:06:49 +00:00
return err
2020-07-27 15:31:36 +00:00
}
return nil
}
2020-02-17 05:51:18 +00:00
type BlockValidator struct {
2020-08-16 17:46:19 +00:00
self peer . ID
2020-02-17 05:51:18 +00:00
peers * lru . TwoQueueCache
killThresh int
recvBlocks * blockReceiptCache
blacklist func ( peer . ID )
2020-05-12 18:05:29 +00:00
// necessary for block validation
2021-09-02 16:07:23 +00:00
chain * store . ChainStore
consensus consensus . Consensus
2020-02-17 05:51:18 +00:00
}
2021-09-02 16:07:23 +00:00
func NewBlockValidator ( self peer . ID , chain * store . ChainStore , cns consensus . Consensus , blacklist func ( peer . ID ) ) * BlockValidator {
2020-02-17 05:51:18 +00:00
p , _ := lru . New2Q ( 4096 )
return & BlockValidator {
2020-08-16 17:46:19 +00:00
self : self ,
2020-02-17 05:51:18 +00:00
peers : p ,
2020-05-05 13:49:43 +00:00
killThresh : 10 ,
2020-02-17 05:51:18 +00:00
blacklist : blacklist ,
recvBlocks : newBlockReceiptCache ( ) ,
2020-05-12 18:05:29 +00:00
chain : chain ,
2021-09-02 16:07:23 +00:00
consensus : cns ,
2020-02-17 05:51:18 +00:00
}
}
func ( bv * BlockValidator ) flagPeer ( p peer . ID ) {
v , ok := bv . peers . Get ( p )
if ! ok {
bv . peers . Add ( p , int ( 1 ) )
return
}
val := v . ( int )
if val >= bv . killThresh {
2020-05-05 13:49:43 +00:00
log . Warnf ( "blacklisting peer %s" , p )
2020-02-17 05:51:18 +00:00
bv . blacklist ( p )
2020-05-05 13:49:43 +00:00
return
2020-02-17 05:51:18 +00:00
}
bv . peers . Add ( p , v . ( int ) + 1 )
}
2021-09-02 16:07:23 +00:00
func ( bv * BlockValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) ( res pubsub . ValidationResult ) {
2020-05-22 14:21:37 +00:00
defer func ( ) {
2021-09-02 16:07:23 +00:00
if rerr := recover ( ) ; rerr != nil {
err := xerrors . Errorf ( "validate block: %s" , rerr )
recordFailure ( ctx , metrics . BlockValidationFailure , err . Error ( ) )
bv . flagPeer ( pid )
res = pubsub . ValidationReject
return
}
2020-05-22 14:21:37 +00:00
} ( )
2021-09-02 16:07:23 +00:00
var what string
res , what = bv . consensus . ValidateBlockPubsub ( ctx , pid == bv . self , msg )
if res == pubsub . ValidationAccept {
// it's a good block! make sure we've only seen it once
if count := bv . recvBlocks . add ( msg . ValidatorData . ( * types . BlockMsg ) . Cid ( ) ) ; count > 0 {
if pid == bv . self {
log . Warnf ( "local block has been seen %d times; ignoring" , count )
}
2020-02-17 05:51:18 +00:00
2021-09-02 16:07:23 +00:00
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
return pubsub . ValidationIgnore
2020-05-12 19:26:25 +00:00
}
2021-09-02 16:07:23 +00:00
} else {
2020-11-19 12:46:40 +00:00
recordFailure ( ctx , metrics . BlockValidationFailure , what )
2020-07-27 23:51:30 +00:00
}
2020-05-12 18:13:30 +00:00
2021-09-02 16:07:23 +00:00
return res
2020-05-12 18:05:29 +00:00
}
2020-02-17 05:51:18 +00:00
type blockReceiptCache struct {
blocks * lru . TwoQueueCache
}
func newBlockReceiptCache ( ) * blockReceiptCache {
c , _ := lru . New2Q ( 8192 )
return & blockReceiptCache {
blocks : c ,
}
}
func ( brc * blockReceiptCache ) add ( bcid cid . Cid ) int {
val , ok := brc . blocks . Get ( bcid )
if ! ok {
brc . blocks . Add ( bcid , int ( 1 ) )
return 0
}
brc . blocks . Add ( bcid , val . ( int ) + 1 )
return val . ( int )
}
2020-02-28 01:39:07 +00:00
type MessageValidator struct {
2020-08-16 17:46:19 +00:00
self peer . ID
2020-02-28 01:39:07 +00:00
mpool * messagepool . MessagePool
}
2020-08-16 17:46:19 +00:00
func NewMessageValidator ( self peer . ID , mp * messagepool . MessagePool ) * MessageValidator {
return & MessageValidator { self : self , mpool : mp }
2020-02-28 01:39:07 +00:00
}
2020-05-05 13:35:03 +00:00
func ( mv * MessageValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
2020-08-16 17:46:19 +00:00
if pid == mv . self {
2020-08-17 06:04:22 +00:00
return mv . validateLocalMessage ( ctx , msg )
2020-08-16 17:46:19 +00:00
}
2021-06-11 11:19:26 +00:00
start := time . Now ( )
defer func ( ) {
ms := time . Now ( ) . Sub ( start ) . Microseconds ( )
stats . Record ( ctx , metrics . MessageValidationDuration . M ( float64 ( ms ) / 1000 ) )
} ( )
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageReceived . M ( 1 ) )
2020-02-28 01:39:07 +00:00
m , err := types . DecodeSignedMessage ( msg . Message . GetData ( ) )
if err != nil {
log . Warnf ( "failed to decode incoming message: %s" , err )
2020-03-02 00:57:16 +00:00
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , "decode" ) )
stats . Record ( ctx , metrics . MessageValidationFailure . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-28 01:39:07 +00:00
}
2021-05-18 18:56:42 +00:00
if err := mv . mpool . Add ( ctx , m ) ; err != nil {
2020-03-01 04:15:02 +00:00
log . Debugf ( "failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s" , m . Message . From , m . Message . To , m . Message . Nonce , types . FIL ( m . Message . Value ) , err )
2020-03-02 00:26:09 +00:00
ctx , _ = tag . New (
ctx ,
2020-08-28 06:11:24 +00:00
tag . Upsert ( metrics . Local , "false" ) ,
2020-03-02 00:26:09 +00:00
)
2020-08-28 07:01:45 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "add" )
2020-05-14 10:31:45 +00:00
switch {
2020-08-26 12:16:04 +00:00
case xerrors . Is ( err , messagepool . ErrSoftValidationFailure ) :
2020-08-19 10:20:16 +00:00
fallthrough
case xerrors . Is ( err , messagepool . ErrRBFTooLowPremium ) :
fallthrough
2020-08-26 19:48:03 +00:00
case xerrors . Is ( err , messagepool . ErrTooManyPendingMessages ) :
fallthrough
2020-09-01 14:59:44 +00:00
case xerrors . Is ( err , messagepool . ErrNonceGap ) :
fallthrough
2020-08-19 10:20:16 +00:00
case xerrors . Is ( err , messagepool . ErrNonceTooLow ) :
2020-05-14 10:31:45 +00:00
return pubsub . ValidationIgnore
default :
return pubsub . ValidationReject
2020-05-05 13:35:03 +00:00
}
2020-02-28 01:39:07 +00:00
}
2021-06-11 11:19:26 +00:00
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . MsgValid , "true" ) ,
)
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageValidationSuccess . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationAccept
2020-02-28 01:39:07 +00:00
}
2020-08-17 06:04:22 +00:00
func ( mv * MessageValidator ) validateLocalMessage ( ctx context . Context , msg * pubsub . Message ) pubsub . ValidationResult {
2020-08-28 06:11:24 +00:00
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . Local , "true" ) ,
)
2021-06-11 11:19:26 +00:00
start := time . Now ( )
defer func ( ) {
ms := time . Now ( ) . Sub ( start ) . Microseconds ( )
stats . Record ( ctx , metrics . MessageValidationDuration . M ( float64 ( ms ) / 1000 ) )
} ( )
2020-08-17 06:04:22 +00:00
// do some lightweight validation
stats . Record ( ctx , metrics . MessagePublished . M ( 1 ) )
m , err := types . DecodeSignedMessage ( msg . Message . GetData ( ) )
if err != nil {
log . Warnf ( "failed to decode local message: %s" , err )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "decode" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
2021-06-09 07:19:35 +00:00
if m . Size ( ) > messagepool . MaxMessageSize {
2020-08-17 06:04:22 +00:00
log . Warnf ( "local message is too large! (%dB)" , m . Size ( ) )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "oversize" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if m . Message . To == address . Undef {
log . Warn ( "local message has invalid destination address" )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "undef-addr" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if ! m . Message . Value . LessThan ( types . TotalFilecoinInt ) {
log . Warnf ( "local messages has too high value: %s" , m . Message . Value )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "value-too-high" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if err := mv . mpool . VerifyMsgSig ( m ) ; err != nil {
log . Warnf ( "signature verification failed for local message: %s" , err )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "verify-sig" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
2021-06-11 11:19:26 +00:00
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . MsgValid , "true" ) ,
)
2020-08-17 06:04:22 +00:00
stats . Record ( ctx , metrics . MessageValidationSuccess . M ( 1 ) )
return pubsub . ValidationAccept
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages ( ctx context . Context , mpool * messagepool . MessagePool , msub * pubsub . Subscription ) {
2019-07-08 14:07:09 +00:00
for {
2020-02-28 01:39:07 +00:00
_ , err := msub . Next ( ctx )
2019-07-08 14:07:09 +00:00
if err != nil {
2019-09-17 14:23:08 +00:00
log . Warn ( "error from message subscription: " , err )
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingMessages loop" )
return
}
2019-07-08 14:07:09 +00:00
continue
}
2020-02-28 01:39:07 +00:00
// Do nothing... everything happens in validate
2019-07-08 14:07:09 +00:00
}
}
2020-08-28 06:53:59 +00:00
func recordFailure ( ctx context . Context , metric * stats . Int64Measure , failureType string ) {
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . FailureType , failureType ) ,
)
stats . Record ( ctx , metric . M ( 1 ) )
2020-08-28 07:01:45 +00:00
}
2022-02-03 22:56:21 +00:00
2022-02-08 10:53:25 +00:00
type peerMsgInfo struct {
peerID peer . ID
lastCid cid . Cid
2022-02-10 00:21:05 +00:00
lastSeqno uint64
2022-02-08 10:53:25 +00:00
rateLimit * ratelimit . Window
mutex sync . Mutex
}
2022-02-03 22:56:21 +00:00
type IndexerMessageValidator struct {
self peer . ID
2022-02-08 10:53:25 +00:00
peerCache * lru . TwoQueueCache
2022-02-08 12:55:59 +00:00
chainApi full . ChainModuleAPI
stateApi full . StateModuleAPI
2022-02-03 22:56:21 +00:00
}
2022-02-08 12:55:59 +00:00
func NewIndexerMessageValidator ( self peer . ID , chainApi full . ChainModuleAPI , stateApi full . StateModuleAPI ) * IndexerMessageValidator {
2022-02-09 18:29:49 +00:00
peerCache , _ := lru . New2Q ( 8192 )
2022-02-08 10:53:25 +00:00
return & IndexerMessageValidator {
self : self ,
peerCache : peerCache ,
2022-02-08 12:55:59 +00:00
chainApi : chainApi ,
stateApi : stateApi ,
2022-02-08 10:53:25 +00:00
}
2022-02-03 22:56:21 +00:00
}
func ( v * IndexerMessageValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
// This chain-node should not be publishing its own messages. These are
2022-02-08 10:53:25 +00:00
// relayed from market-nodes. If a node appears to be local, reject it.
2022-02-03 22:56:21 +00:00
if pid == v . self {
2022-02-10 16:44:40 +00:00
log . Debug ( "ignoring indexer message from self" )
2022-02-03 22:56:21 +00:00
stats . Record ( ctx , metrics . IndexerMessageValidationFailure . M ( 1 ) )
2022-02-10 16:44:40 +00:00
return pubsub . ValidationIgnore
2022-02-03 22:56:21 +00:00
}
2022-02-08 10:53:25 +00:00
originPeer := msg . GetFrom ( )
if originPeer == v . self {
2022-02-10 16:44:40 +00:00
log . Debug ( "ignoring indexer message originating from self" )
2022-02-08 10:53:25 +00:00
stats . Record ( ctx , metrics . IndexerMessageValidationFailure . M ( 1 ) )
2022-02-10 16:44:40 +00:00
return pubsub . ValidationIgnore
2022-02-08 10:53:25 +00:00
}
2022-02-10 00:40:27 +00:00
idxrMsg := dtsync . Message { }
err := idxrMsg . UnmarshalCBOR ( bytes . NewBuffer ( msg . Data ) )
2022-02-08 10:53:25 +00:00
if err != nil {
2022-02-10 00:40:27 +00:00
log . Errorw ( "Could not decode indexer pubsub message" , "err" , err )
2022-02-08 10:53:25 +00:00
return pubsub . ValidationReject
}
2022-02-10 00:40:27 +00:00
if len ( idxrMsg . ExtraData ) == 0 {
2022-02-09 18:29:49 +00:00
log . Debugw ( "ignoring messsage missing miner id" , "peer" , originPeer )
2022-02-08 10:53:25 +00:00
return pubsub . ValidationIgnore
}
2022-02-11 18:30:29 +00:00
// Get miner info from lotus
minerAddr , err := address . NewFromBytes ( idxrMsg . ExtraData )
if err != nil {
log . Warnw ( "cannot parse extra data as miner address" , "err" , err , "extraData" , idxrMsg . ExtraData )
return pubsub . ValidationReject
}
minerID := minerAddr . String ( )
2022-02-10 00:40:27 +00:00
msgCid := idxrMsg . Cid
2022-02-08 10:53:25 +00:00
var msgInfo * peerMsgInfo
val , ok := v . peerCache . Get ( minerID )
if ! ok {
msgInfo = & peerMsgInfo { }
} else {
msgInfo = val . ( * peerMsgInfo )
}
// Lock this peer's message info.
msgInfo . mutex . Lock ( )
defer msgInfo . mutex . Unlock ( )
2022-02-09 19:06:56 +00:00
if ok {
// Reject replayed messages.
2022-02-10 00:21:05 +00:00
seqno := binary . BigEndian . Uint64 ( msg . Message . GetSeqno ( ) )
if seqno <= msgInfo . lastSeqno {
log . Debugf ( "ignoring replayed indexer message" )
return pubsub . ValidationIgnore
2022-02-09 19:06:56 +00:00
}
msgInfo . lastSeqno = seqno
}
2022-02-08 10:53:25 +00:00
if ! ok || originPeer != msgInfo . peerID {
2022-02-08 12:55:59 +00:00
// Check that the miner ID maps to the peer that sent the message.
2022-02-11 18:30:29 +00:00
err = v . authenticateMessage ( ctx , minerAddr , originPeer )
2022-02-08 10:53:25 +00:00
if err != nil {
log . Warnw ( "cannot authenticate messsage" , "err" , err , "peer" , originPeer , "minerID" , minerID )
stats . Record ( ctx , metrics . IndexerMessageValidationFailure . M ( 1 ) )
return pubsub . ValidationReject
}
msgInfo . peerID = originPeer
if ! ok {
// Add msgInfo to cache only after being authenticated. If two
// messages from the same peer are handled concurrently, there is a
// small chance that one msgInfo could replace the other here when
// the info is first cached. This is OK, so no need to prevent it.
v . peerCache . Add ( minerID , msgInfo )
}
}
// See if message needs to be ignored due to rate limiting.
if v . rateLimitPeer ( msgInfo , msgCid ) {
return pubsub . ValidationIgnore
}
2022-02-03 22:56:21 +00:00
stats . Record ( ctx , metrics . IndexerMessageValidationSuccess . M ( 1 ) )
return pubsub . ValidationAccept
}
2022-02-08 10:53:25 +00:00
func ( v * IndexerMessageValidator ) rateLimitPeer ( msgInfo * peerMsgInfo , msgCid cid . Cid ) bool {
const (
msgLimit = 5
msgTimeLimit = 10 * time . Second
repeatTimeLimit = 2 * time . Hour
)
timeWindow := msgInfo . rateLimit
// Check overall message rate.
if timeWindow == nil {
timeWindow = ratelimit . NewWindow ( msgLimit , msgTimeLimit )
msgInfo . rateLimit = timeWindow
} else if msgInfo . lastCid == msgCid {
// Check if this is a repeat of the previous message data.
if time . Since ( timeWindow . Newest ( ) ) < repeatTimeLimit {
log . Warnw ( "ignoring repeated indexer message" , "sender" , msgInfo . peerID )
return true
}
}
err := timeWindow . Add ( )
if err != nil {
log . Warnw ( "ignoring indexer message" , "sender" , msgInfo . peerID , "err" , err )
return true
}
msgInfo . lastCid = msgCid
return false
}
2022-02-11 18:30:29 +00:00
func ( v * IndexerMessageValidator ) authenticateMessage ( ctx context . Context , minerAddress address . Address , peerID peer . ID ) error {
2022-02-08 12:55:59 +00:00
ts , err := v . chainApi . ChainHead ( ctx )
2022-02-08 10:53:25 +00:00
if err != nil {
return err
}
2022-02-08 12:55:59 +00:00
minerInfo , err := v . stateApi . StateMinerInfo ( ctx , minerAddress , ts . Key ( ) )
2022-02-08 10:53:25 +00:00
if err != nil {
return err
}
if minerInfo . PeerId == nil {
return xerrors . New ( "no peer id for miner" )
}
if * minerInfo . PeerId != peerID {
2022-02-08 12:55:59 +00:00
return xerrors . New ( "miner id does not map to peer that sent message" )
2022-02-08 10:53:25 +00:00
}
2022-02-08 12:55:59 +00:00
2022-02-08 10:53:25 +00:00
return nil
}