2019-07-08 14:07:09 +00:00
package sub
import (
2020-05-12 18:05:29 +00:00
"bytes"
2019-07-08 14:07:09 +00:00
"context"
2020-08-27 13:53:53 +00:00
"errors"
2020-05-12 19:26:25 +00:00
"fmt"
2020-05-12 18:13:30 +00:00
"sync"
2019-12-07 10:49:05 +00:00
"time"
2019-09-18 02:50:03 +00:00
2020-04-01 01:34:23 +00:00
"golang.org/x/xerrors"
2020-05-12 18:05:29 +00:00
address "github.com/filecoin-project/go-address"
miner "github.com/filecoin-project/specs-actors/actors/builtin/miner"
2020-07-23 00:14:54 +00:00
"github.com/filecoin-project/specs-actors/actors/util/adt"
2020-02-17 05:51:18 +00:00
lru "github.com/hashicorp/golang-lru"
2020-08-07 13:04:10 +00:00
blocks "github.com/ipfs/go-block-format"
bserv "github.com/ipfs/go-blockservice"
2020-02-17 05:51:18 +00:00
"github.com/ipfs/go-cid"
2020-05-12 18:05:29 +00:00
cbor "github.com/ipfs/go-ipld-cbor"
2020-01-08 19:10:57 +00:00
logging "github.com/ipfs/go-log/v2"
2019-12-17 07:06:48 +00:00
connmgr "github.com/libp2p/go-libp2p-core/connmgr"
2020-05-05 14:12:06 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-08 14:07:09 +00:00
pubsub "github.com/libp2p/go-libp2p-pubsub"
2020-05-12 19:26:25 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
2020-03-02 00:26:09 +00:00
"go.opencensus.io/stats"
"go.opencensus.io/tag"
2019-07-08 15:14:36 +00:00
2020-01-07 20:41:26 +00:00
"github.com/filecoin-project/lotus/build"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2020-05-12 18:05:29 +00:00
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/types"
2020-07-23 02:05:11 +00:00
"github.com/filecoin-project/lotus/lib/blockstore"
2020-05-12 18:05:29 +00:00
"github.com/filecoin-project/lotus/lib/bufbstore"
2020-05-12 19:26:25 +00:00
"github.com/filecoin-project/lotus/lib/sigs"
2020-03-02 00:26:09 +00:00
"github.com/filecoin-project/lotus/metrics"
2019-07-08 14:07:09 +00:00
)
var log = logging . Logger ( "sub" )
2020-08-27 13:53:53 +00:00
var ErrSoftFailure = errors . New ( "soft validation failure" )
var ErrInsufficientPower = errors . New ( "incoming block's miner does not have minimum power" )
2020-07-27 15:31:36 +00:00
func HandleIncomingBlocks ( ctx context . Context , bsub * pubsub . Subscription , s * chain . Syncer , bserv bserv . BlockService , cmgr connmgr . ConnManager ) {
2019-07-08 14:07:09 +00:00
for {
msg , err := bsub . Next ( ctx )
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingBlocks loop" )
return
}
2019-07-28 05:35:32 +00:00
log . Error ( "error from block subscription: " , err )
2019-07-08 14:07:09 +00:00
continue
}
2020-02-17 05:51:18 +00:00
blk , ok := msg . ValidatorData . ( * types . BlockMsg )
if ! ok {
log . Warnf ( "pubsub block validator passed on wrong type: %#v" , msg . ValidatorData )
2020-05-05 13:35:03 +00:00
return
2020-01-07 20:41:26 +00:00
}
2020-06-02 14:29:39 +00:00
src := msg . GetFrom ( )
2020-05-05 13:49:43 +00:00
2019-07-08 14:07:09 +00:00
go func ( ) {
2020-07-10 14:43:14 +00:00
start := build . Clock . Now ( )
2019-08-27 18:45:21 +00:00
log . Debug ( "about to fetch messages for block from pubsub" )
2020-07-27 15:31:36 +00:00
bmsgs , err := FetchMessagesByCids ( context . TODO ( ) , bserv , blk . BlsMessages )
2019-07-08 14:07:09 +00:00
if err != nil {
2020-05-12 16:29:06 +00:00
log . Errorf ( "failed to fetch all bls messages for block received over pubusb: %s; source: %s" , err , src )
2019-07-08 14:07:09 +00:00
return
}
2019-08-01 20:40:47 +00:00
2020-07-27 15:31:36 +00:00
smsgs , err := FetchSignedMessagesByCids ( context . TODO ( ) , bserv , blk . SecpkMessages )
2019-08-01 20:40:47 +00:00
if err != nil {
2020-05-12 16:29:06 +00:00
log . Errorf ( "failed to fetch all secpk messages for block received over pubusb: %s; source: %s" , err , src )
2019-08-01 20:40:47 +00:00
return
}
2020-07-10 14:43:14 +00:00
took := build . Clock . Since ( start )
2019-12-11 20:41:24 +00:00
log . Infow ( "new block over pubsub" , "cid" , blk . Header . Cid ( ) , "source" , msg . GetFrom ( ) , "msgfetch" , took )
2020-07-10 14:43:14 +00:00
if delay := build . Clock . Now ( ) . Unix ( ) - int64 ( blk . Header . Timestamp ) ; delay > 5 {
2019-12-07 10:49:05 +00:00
log . Warnf ( "Received block with large delay %d from miner %s" , delay , blk . Header . Miner )
}
2019-12-17 07:06:48 +00:00
2019-12-19 06:19:15 +00:00
if s . InformNewBlock ( msg . ReceivedFrom , & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : blk . Header ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-12-17 07:06:48 +00:00
} ) {
2019-12-19 06:19:15 +00:00
cmgr . TagPeer ( msg . ReceivedFrom , "blkprop" , 5 )
2019-12-17 07:06:48 +00:00
}
2019-07-08 14:07:09 +00:00
} ( )
}
}
2020-07-27 15:31:36 +00:00
func FetchMessagesByCids (
ctx context . Context ,
bserv bserv . BlockService ,
cids [ ] cid . Cid ,
) ( [ ] * types . Message , error ) {
out := make ( [ ] * types . Message , len ( cids ) )
err := fetchCids ( ctx , bserv , cids , func ( i int , b blocks . Block ) error {
msg , err := types . DecodeMessage ( b . RawData ( ) )
if err != nil {
return err
}
// FIXME: We already sort in `fetchCids`, we are duplicating too much work,
// we don't need to pass the index.
if out [ i ] != nil {
return fmt . Errorf ( "received duplicate message" )
}
out [ i ] = msg
return nil
} )
if err != nil {
return nil , err
}
return out , nil
}
// FIXME: Duplicate of above.
func FetchSignedMessagesByCids (
ctx context . Context ,
bserv bserv . BlockService ,
cids [ ] cid . Cid ,
) ( [ ] * types . SignedMessage , error ) {
out := make ( [ ] * types . SignedMessage , len ( cids ) )
err := fetchCids ( ctx , bserv , cids , func ( i int , b blocks . Block ) error {
smsg , err := types . DecodeSignedMessage ( b . RawData ( ) )
if err != nil {
return err
}
if out [ i ] != nil {
return fmt . Errorf ( "received duplicate message" )
}
out [ i ] = smsg
return nil
} )
if err != nil {
return nil , err
}
return out , nil
}
// Fetch `cids` from the block service, apply `cb` on each of them. Used
// by the fetch message functions above.
// We check that each block is received only once and we do not received
// blocks we did not request.
func fetchCids (
ctx context . Context ,
bserv bserv . BlockService ,
cids [ ] cid . Cid ,
cb func ( int , blocks . Block ) error ,
) error {
// FIXME: Why don't we use the context here?
fetchedBlocks := bserv . GetBlocks ( context . TODO ( ) , cids )
cidIndex := make ( map [ cid . Cid ] int )
for i , c := range cids {
cidIndex [ c ] = i
}
for i := 0 ; i < len ( cids ) ; i ++ {
select {
case block , ok := <- fetchedBlocks :
if ! ok {
// Closed channel, no more blocks fetched, check if we have all
// of the CIDs requested.
// FIXME: Review this check. We don't call the callback on the
// last index?
if i == len ( cids ) - 1 {
break
}
return fmt . Errorf ( "failed to fetch all messages" )
}
ix , ok := cidIndex [ block . Cid ( ) ]
if ! ok {
return fmt . Errorf ( "received message we didnt ask for" )
}
if err := cb ( ix , block ) ; err != nil {
return err
}
}
}
return nil
}
2020-02-17 05:51:18 +00:00
type BlockValidator struct {
2020-08-16 17:46:19 +00:00
self peer . ID
2020-02-17 05:51:18 +00:00
peers * lru . TwoQueueCache
killThresh int
recvBlocks * blockReceiptCache
blacklist func ( peer . ID )
2020-05-12 18:05:29 +00:00
// necessary for block validation
chain * store . ChainStore
stmgr * stmgr . StateManager
2020-05-12 18:13:30 +00:00
mx sync . Mutex
keycache map [ string ] address . Address
2020-02-17 05:51:18 +00:00
}
2020-08-16 17:46:19 +00:00
func NewBlockValidator ( self peer . ID , chain * store . ChainStore , stmgr * stmgr . StateManager , blacklist func ( peer . ID ) ) * BlockValidator {
2020-02-17 05:51:18 +00:00
p , _ := lru . New2Q ( 4096 )
return & BlockValidator {
2020-08-16 17:46:19 +00:00
self : self ,
2020-02-17 05:51:18 +00:00
peers : p ,
2020-05-05 13:49:43 +00:00
killThresh : 10 ,
2020-02-17 05:51:18 +00:00
blacklist : blacklist ,
recvBlocks : newBlockReceiptCache ( ) ,
2020-05-12 18:05:29 +00:00
chain : chain ,
stmgr : stmgr ,
2020-05-12 19:37:01 +00:00
keycache : make ( map [ string ] address . Address ) ,
2020-02-17 05:51:18 +00:00
}
}
func ( bv * BlockValidator ) flagPeer ( p peer . ID ) {
v , ok := bv . peers . Get ( p )
if ! ok {
bv . peers . Add ( p , int ( 1 ) )
return
}
val := v . ( int )
if val >= bv . killThresh {
2020-05-05 13:49:43 +00:00
log . Warnf ( "blacklisting peer %s" , p )
2020-02-17 05:51:18 +00:00
bv . blacklist ( p )
2020-05-05 13:49:43 +00:00
return
2020-02-17 05:51:18 +00:00
}
bv . peers . Add ( p , v . ( int ) + 1 )
}
2020-05-05 13:35:03 +00:00
func ( bv * BlockValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
2020-08-16 17:46:19 +00:00
if pid == bv . self {
2020-08-17 06:04:22 +00:00
return bv . validateLocalBlock ( ctx , msg )
2020-08-16 17:46:19 +00:00
}
2020-05-22 14:21:37 +00:00
// track validation time
2020-07-10 14:43:14 +00:00
begin := build . Clock . Now ( )
2020-05-22 14:21:37 +00:00
defer func ( ) {
2020-07-10 14:43:14 +00:00
log . Debugf ( "block validation time: %s" , build . Clock . Since ( begin ) )
2020-05-22 14:21:37 +00:00
} ( )
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . BlockReceived . M ( 1 ) )
2020-05-12 18:05:29 +00:00
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer := func ( what string ) {
2020-08-28 07:11:59 +00:00
recordFailure ( ctx , metrics . BlockValidationFailure , what )
2020-05-12 19:26:25 +00:00
bv . flagPeer ( pid )
}
2020-08-17 18:03:08 +00:00
blk , what , err := bv . decodeAndCheckBlock ( msg )
2020-02-17 05:51:18 +00:00
if err != nil {
log . Error ( "got invalid block over pubsub: " , err )
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer ( what )
2020-05-12 19:26:25 +00:00
return pubsub . ValidationReject
}
// validate the block meta: the Message CID in the header must match the included messages
err = bv . validateMsgMeta ( ctx , blk )
if err != nil {
log . Warnf ( "error validating message metadata: %s" , err )
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer ( "invalid_block_meta" )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-17 05:51:18 +00:00
}
2020-05-12 18:05:29 +00:00
// we want to ensure that it is a block from a known miner; we reject blocks from unknown miners
// to prevent spam attacks.
// the logic works as follows: we lookup the miner in the chain for its key.
// if we can find it then it's a known miner and we can validate the signature.
// if we can't find it, we check whether we are (near) synced in the chain.
// if we are not synced we cannot validate the block and we must ignore it.
2020-05-12 19:26:25 +00:00
// if we are synced and the miner is unknown, then the block is rejcected.
2020-07-27 23:51:30 +00:00
key , err := bv . checkPowerAndGetWorkerKey ( ctx , blk . Header )
2020-05-12 19:26:25 +00:00
if err != nil {
2020-08-27 13:53:53 +00:00
if err != ErrSoftFailure && bv . isChainNearSynced ( ) {
2020-07-27 23:51:30 +00:00
log . Warnf ( "received block from unknown miner or miner that doesn't meet min power over pubsub; rejecting message" )
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer ( "unknown_miner" )
2020-05-12 19:26:25 +00:00
return pubsub . ValidationReject
}
2020-08-27 13:53:53 +00:00
2020-08-20 04:49:10 +00:00
log . Warnf ( "cannot validate block message; unknown miner or miner that doesn't meet min power in unsynced chain" )
return pubsub . ValidationIgnore
2020-05-12 19:26:25 +00:00
}
2020-05-12 18:05:29 +00:00
2020-06-02 14:29:39 +00:00
err = sigs . CheckBlockSignature ( ctx , blk . Header , key )
2020-05-12 19:26:25 +00:00
if err != nil {
log . Errorf ( "block signature verification failed: %s" , err )
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer ( "signature_verification_failed" )
2020-05-12 19:26:25 +00:00
return pubsub . ValidationReject
}
2020-05-12 18:05:29 +00:00
2020-06-23 14:59:44 +00:00
if blk . Header . ElectionProof . WinCount < 1 {
log . Errorf ( "block is not claiming to be winning" )
2020-08-28 07:15:11 +00:00
recordFailureFlagPeer ( "not_winning" )
2020-06-23 14:59:44 +00:00
return pubsub . ValidationReject
}
2020-05-12 18:05:29 +00:00
// it's a good block! make sure we've only seen it once
2020-02-17 05:51:18 +00:00
if bv . recvBlocks . add ( blk . Header . Cid ( ) ) > 0 {
// TODO: once these changes propagate to the network, we can consider
// dropping peers who send us the same block multiple times
2020-05-05 13:35:03 +00:00
return pubsub . ValidationIgnore
2020-02-17 05:51:18 +00:00
}
2020-05-12 18:05:29 +00:00
// all good, accept the block
2020-02-17 05:51:18 +00:00
msg . ValidatorData = blk
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . BlockValidationSuccess . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationAccept
2020-02-17 05:51:18 +00:00
}
2020-08-17 06:04:22 +00:00
func ( bv * BlockValidator ) validateLocalBlock ( ctx context . Context , msg * pubsub . Message ) pubsub . ValidationResult {
stats . Record ( ctx , metrics . BlockPublished . M ( 1 ) )
2020-08-17 18:03:08 +00:00
blk , what , err := bv . decodeAndCheckBlock ( msg )
2020-08-17 06:04:22 +00:00
if err != nil {
2020-08-17 18:03:08 +00:00
log . Errorf ( "got invalid local block: %s" , err )
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , what ) )
2020-08-17 06:04:22 +00:00
stats . Record ( ctx , metrics . BlockValidationFailure . M ( 1 ) )
return pubsub . ValidationIgnore
}
if count := bv . recvBlocks . add ( blk . Header . Cid ( ) ) ; count > 0 {
log . Warnf ( "local block has been seen %d times; ignoring" , count )
return pubsub . ValidationIgnore
}
msg . ValidatorData = blk
stats . Record ( ctx , metrics . BlockValidationSuccess . M ( 1 ) )
return pubsub . ValidationAccept
}
2020-08-17 18:03:08 +00:00
func ( bv * BlockValidator ) decodeAndCheckBlock ( msg * pubsub . Message ) ( * types . BlockMsg , string , error ) {
blk , err := types . DecodeBlockMsg ( msg . GetData ( ) )
if err != nil {
return nil , "invalid" , xerrors . Errorf ( "error decoding block: %w" , err )
}
if count := len ( blk . BlsMessages ) + len ( blk . SecpkMessages ) ; count > build . BlockMessageLimit {
return nil , "too_many_messages" , fmt . Errorf ( "block contains too many messages (%d)" , count )
}
// make sure we have a signature
if blk . Header . BlockSig == nil {
return nil , "missing_signature" , fmt . Errorf ( "block without a signature" )
}
return blk , "" , nil
}
2020-05-12 19:26:25 +00:00
func ( bv * BlockValidator ) isChainNearSynced ( ) bool {
ts := bv . chain . GetHeaviestTipSet ( )
timestamp := ts . MinTimestamp ( )
2020-07-10 14:43:14 +00:00
now := build . Clock . Now ( ) . UnixNano ( )
2020-05-12 19:26:25 +00:00
cutoff := uint64 ( now ) - uint64 ( 6 * time . Hour )
return timestamp > cutoff
}
func ( bv * BlockValidator ) validateMsgMeta ( ctx context . Context , msg * types . BlockMsg ) error {
2020-07-23 00:14:54 +00:00
// TODO there has to be a simpler way to do this without the blockstore dance
2020-07-23 02:05:11 +00:00
store := adt . WrapStore ( ctx , cbor . NewCborStore ( blockstore . NewTemporary ( ) ) )
2020-07-23 00:14:54 +00:00
bmArr := adt . MakeEmptyArray ( store )
smArr := adt . MakeEmptyArray ( store )
for i , m := range msg . BlsMessages {
2020-05-12 19:26:25 +00:00
c := cbg . CborCid ( m )
2020-07-23 00:14:54 +00:00
if err := bmArr . Set ( uint64 ( i ) , & c ) ; err != nil {
return err
}
2020-05-12 19:26:25 +00:00
}
2020-07-23 00:14:54 +00:00
for i , m := range msg . SecpkMessages {
2020-05-12 19:26:25 +00:00
c := cbg . CborCid ( m )
2020-07-23 00:14:54 +00:00
if err := smArr . Set ( uint64 ( i ) , & c ) ; err != nil {
return err
}
2020-05-12 19:26:25 +00:00
}
2020-07-23 00:14:54 +00:00
bmroot , err := bmArr . Root ( )
2020-05-12 19:26:25 +00:00
if err != nil {
return err
}
2020-07-23 00:14:54 +00:00
smroot , err := smArr . Root ( )
2020-05-12 19:26:25 +00:00
if err != nil {
return err
}
2020-07-23 00:14:54 +00:00
mrcid , err := store . Put ( store . Context ( ) , & types . MsgMeta {
2020-05-12 19:26:25 +00:00
BlsMessages : bmroot ,
SecpkMessages : smroot ,
} )
if err != nil {
return err
}
if msg . Header . Messages != mrcid {
return fmt . Errorf ( "messages didn't match root cid in header" )
}
return nil
}
2020-07-27 23:51:30 +00:00
func ( bv * BlockValidator ) checkPowerAndGetWorkerKey ( ctx context . Context , bh * types . BlockHeader ) ( address . Address , error ) {
addr := bh . Miner
2020-05-12 18:05:29 +00:00
2020-05-12 18:13:30 +00:00
bv . mx . Lock ( )
key , ok := bv . keycache [ addr . String ( ) ]
bv . mx . Unlock ( )
2020-07-27 23:51:30 +00:00
if ! ok {
// TODO I have a feeling all this can be simplified by cleverer DI to use the API
ts := bv . chain . GetHeaviestTipSet ( )
st , _ , err := bv . stmgr . TipSetState ( ctx , ts )
if err != nil {
return address . Undef , err
}
buf := bufbstore . NewBufferedBstore ( bv . chain . Blockstore ( ) )
cst := cbor . NewCborStore ( buf )
state , err := state . LoadStateTree ( cst , st )
if err != nil {
return address . Undef , err
}
act , err := state . GetActor ( addr )
if err != nil {
return address . Undef , err
}
2020-05-12 18:05:29 +00:00
2020-07-27 23:51:30 +00:00
blk , err := bv . chain . Blockstore ( ) . Get ( act . Head )
if err != nil {
return address . Undef , err
}
aso := blk . RawData ( )
2020-05-12 18:05:29 +00:00
2020-07-27 23:51:30 +00:00
var mst miner . State
err = mst . UnmarshalCBOR ( bytes . NewReader ( aso ) )
if err != nil {
return address . Undef , err
}
info , err := mst . GetInfo ( adt . WrapStore ( ctx , cst ) )
if err != nil {
return address . Undef , err
}
worker := info . Worker
key , err = bv . stmgr . ResolveToKeyAddress ( ctx , worker , ts )
if err != nil {
return address . Undef , err
}
bv . mx . Lock ( )
bv . keycache [ addr . String ( ) ] = key
bv . mx . Unlock ( )
2020-05-12 18:05:29 +00:00
}
2020-07-27 23:51:30 +00:00
// we check that the miner met the minimum power at the lookback tipset
2020-07-30 21:05:28 +00:00
baseTs := bv . chain . GetHeaviestTipSet ( )
2020-07-27 23:51:30 +00:00
lbts , err := stmgr . GetLookbackTipSetForRound ( ctx , bv . stmgr , baseTs , bh . Height )
2020-07-01 11:47:40 +00:00
if err != nil {
2020-08-27 13:53:53 +00:00
log . Warnf ( "failed to load lookback tipset for incoming block: %s" , err )
return address . Undef , ErrSoftFailure
2020-07-01 11:47:40 +00:00
}
2020-07-27 23:51:30 +00:00
hmp , err := stmgr . MinerHasMinPower ( ctx , bv . stmgr , bh . Miner , lbts )
2020-05-12 18:05:29 +00:00
if err != nil {
2020-08-27 13:53:53 +00:00
log . Warnf ( "failed to determine if incoming block's miner has minimum power: %s" , err )
return address . Undef , ErrSoftFailure
2020-05-12 18:05:29 +00:00
}
2020-07-27 23:51:30 +00:00
if ! hmp {
log . Warnf ( "incoming block's miner does not have minimum power" )
2020-08-27 13:53:53 +00:00
return address . Undef , ErrInsufficientPower
2020-07-27 23:51:30 +00:00
}
2020-05-12 18:13:30 +00:00
2020-05-12 18:05:29 +00:00
return key , nil
}
2020-02-17 05:51:18 +00:00
type blockReceiptCache struct {
blocks * lru . TwoQueueCache
}
func newBlockReceiptCache ( ) * blockReceiptCache {
c , _ := lru . New2Q ( 8192 )
return & blockReceiptCache {
blocks : c ,
}
}
func ( brc * blockReceiptCache ) add ( bcid cid . Cid ) int {
val , ok := brc . blocks . Get ( bcid )
if ! ok {
brc . blocks . Add ( bcid , int ( 1 ) )
return 0
}
brc . blocks . Add ( bcid , val . ( int ) + 1 )
return val . ( int )
}
2020-02-28 01:39:07 +00:00
type MessageValidator struct {
2020-08-16 17:46:19 +00:00
self peer . ID
2020-02-28 01:39:07 +00:00
mpool * messagepool . MessagePool
}
2020-08-16 17:46:19 +00:00
func NewMessageValidator ( self peer . ID , mp * messagepool . MessagePool ) * MessageValidator {
return & MessageValidator { self : self , mpool : mp }
2020-02-28 01:39:07 +00:00
}
2020-05-05 13:35:03 +00:00
func ( mv * MessageValidator ) Validate ( ctx context . Context , pid peer . ID , msg * pubsub . Message ) pubsub . ValidationResult {
2020-08-16 17:46:19 +00:00
if pid == mv . self {
2020-08-17 06:04:22 +00:00
return mv . validateLocalMessage ( ctx , msg )
2020-08-16 17:46:19 +00:00
}
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageReceived . M ( 1 ) )
2020-02-28 01:39:07 +00:00
m , err := types . DecodeSignedMessage ( msg . Message . GetData ( ) )
if err != nil {
log . Warnf ( "failed to decode incoming message: %s" , err )
2020-03-02 00:57:16 +00:00
ctx , _ = tag . New ( ctx , tag . Insert ( metrics . FailureType , "decode" ) )
stats . Record ( ctx , metrics . MessageValidationFailure . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationReject
2020-02-28 01:39:07 +00:00
}
if err := mv . mpool . Add ( m ) ; err != nil {
2020-03-01 04:15:02 +00:00
log . Debugf ( "failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s" , m . Message . From , m . Message . To , m . Message . Nonce , types . FIL ( m . Message . Value ) , err )
2020-03-02 00:26:09 +00:00
ctx , _ = tag . New (
ctx ,
2020-08-28 06:11:24 +00:00
tag . Upsert ( metrics . Local , "false" ) ,
2020-03-02 00:26:09 +00:00
)
2020-08-28 07:01:45 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "add" )
2020-05-14 10:31:45 +00:00
switch {
2020-08-19 10:20:16 +00:00
case xerrors . Is ( err , messagepool . ErrBroadcastAnyway ) :
fallthrough
case xerrors . Is ( err , messagepool . ErrRBFTooLowPremium ) :
fallthrough
case xerrors . Is ( err , messagepool . ErrNonceTooLow ) :
2020-05-14 10:31:45 +00:00
return pubsub . ValidationIgnore
default :
return pubsub . ValidationReject
2020-05-05 13:35:03 +00:00
}
2020-02-28 01:39:07 +00:00
}
2020-03-02 00:57:16 +00:00
stats . Record ( ctx , metrics . MessageValidationSuccess . M ( 1 ) )
2020-05-05 13:35:03 +00:00
return pubsub . ValidationAccept
2020-02-28 01:39:07 +00:00
}
2020-08-17 06:04:22 +00:00
func ( mv * MessageValidator ) validateLocalMessage ( ctx context . Context , msg * pubsub . Message ) pubsub . ValidationResult {
2020-08-28 06:11:24 +00:00
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . Local , "true" ) ,
)
2020-08-17 06:04:22 +00:00
// do some lightweight validation
stats . Record ( ctx , metrics . MessagePublished . M ( 1 ) )
m , err := types . DecodeSignedMessage ( msg . Message . GetData ( ) )
if err != nil {
log . Warnf ( "failed to decode local message: %s" , err )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "decode" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if m . Size ( ) > 32 * 1024 {
log . Warnf ( "local message is too large! (%dB)" , m . Size ( ) )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "oversize" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if m . Message . To == address . Undef {
log . Warn ( "local message has invalid destination address" )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "undef-addr" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if ! m . Message . Value . LessThan ( types . TotalFilecoinInt ) {
log . Warnf ( "local messages has too high value: %s" , m . Message . Value )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "value-too-high" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
if err := mv . mpool . VerifyMsgSig ( m ) ; err != nil {
log . Warnf ( "signature verification failed for local message: %s" , err )
2020-08-28 06:53:59 +00:00
recordFailure ( ctx , metrics . MessageValidationFailure , "verify-sig" )
2020-08-17 06:04:22 +00:00
return pubsub . ValidationIgnore
}
stats . Record ( ctx , metrics . MessageValidationSuccess . M ( 1 ) )
return pubsub . ValidationAccept
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages ( ctx context . Context , mpool * messagepool . MessagePool , msub * pubsub . Subscription ) {
2019-07-08 14:07:09 +00:00
for {
2020-02-28 01:39:07 +00:00
_ , err := msub . Next ( ctx )
2019-07-08 14:07:09 +00:00
if err != nil {
2019-09-17 14:23:08 +00:00
log . Warn ( "error from message subscription: " , err )
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingMessages loop" )
return
}
2019-07-08 14:07:09 +00:00
continue
}
2020-02-28 01:39:07 +00:00
// Do nothing... everything happens in validate
2019-07-08 14:07:09 +00:00
}
}
2020-08-28 06:53:59 +00:00
func recordFailure ( ctx context . Context , metric * stats . Int64Measure , failureType string ) {
ctx , _ = tag . New (
ctx ,
tag . Upsert ( metrics . FailureType , failureType ) ,
)
stats . Record ( ctx , metric . M ( 1 ) )
2020-08-28 07:01:45 +00:00
}