2019-07-08 14:07:09 +00:00
package sub
import (
"context"
2019-12-07 10:49:05 +00:00
"time"
2019-09-18 02:50:03 +00:00
2019-07-08 14:07:09 +00:00
logging "github.com/ipfs/go-log"
pubsub "github.com/libp2p/go-libp2p-pubsub"
2019-07-08 15:14:36 +00:00
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain"
2019-12-01 23:11:43 +00:00
"github.com/filecoin-project/lotus/chain/messagepool"
2019-10-18 04:47:41 +00:00
"github.com/filecoin-project/lotus/chain/types"
2019-07-08 14:07:09 +00:00
)
var log = logging . Logger ( "sub" )
func HandleIncomingBlocks ( ctx context . Context , bsub * pubsub . Subscription , s * chain . Syncer ) {
for {
msg , err := bsub . Next ( ctx )
if err != nil {
2019-09-17 14:23:08 +00:00
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingBlocks loop" )
return
}
2019-07-28 05:35:32 +00:00
log . Error ( "error from block subscription: " , err )
2019-07-08 14:07:09 +00:00
continue
}
2019-09-18 02:50:03 +00:00
blk , err := types . DecodeBlockMsg ( msg . GetData ( ) )
2019-07-08 14:07:09 +00:00
if err != nil {
log . Error ( "got invalid block over pubsub: " , err )
continue
}
go func ( ) {
2019-12-11 20:41:24 +00:00
log . Infof ( "New block over pubsub: %s" , blk . Cid ( ) )
start := time . Now ( )
2019-08-27 18:45:21 +00:00
log . Debug ( "about to fetch messages for block from pubsub" )
2019-08-01 20:40:47 +00:00
bmsgs , err := s . Bsync . FetchMessagesByCids ( context . TODO ( ) , blk . BlsMessages )
2019-07-08 14:07:09 +00:00
if err != nil {
2019-08-01 20:40:47 +00:00
log . Errorf ( "failed to fetch all bls messages for block received over pubusb: %s" , err )
2019-07-08 14:07:09 +00:00
return
}
2019-08-01 20:40:47 +00:00
smsgs , err := s . Bsync . FetchSignedMessagesByCids ( context . TODO ( ) , blk . SecpkMessages )
if err != nil {
log . Errorf ( "failed to fetch all secpk messages for block received over pubusb: %s" , err )
return
}
2019-12-11 20:41:24 +00:00
took := time . Since ( start )
log . Infow ( "new block over pubsub" , "cid" , blk . Header . Cid ( ) , "source" , msg . GetFrom ( ) , "msgfetch" , took )
2019-12-07 10:49:05 +00:00
if delay := time . Now ( ) . Unix ( ) - int64 ( blk . Header . Timestamp ) ; delay > 5 {
log . Warnf ( "Received block with large delay %d from miner %s" , delay , blk . Header . Miner )
}
2019-07-25 22:15:03 +00:00
s . InformNewBlock ( msg . GetFrom ( ) , & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : blk . Header ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-07-08 14:07:09 +00:00
} )
} ( )
}
}
2019-12-01 23:11:43 +00:00
func HandleIncomingMessages ( ctx context . Context , mpool * messagepool . MessagePool , msub * pubsub . Subscription ) {
2019-07-08 14:07:09 +00:00
for {
msg , err := msub . Next ( ctx )
if err != nil {
2019-09-17 14:23:08 +00:00
log . Warn ( "error from message subscription: " , err )
if ctx . Err ( ) != nil {
log . Warn ( "quitting HandleIncomingMessages loop" )
return
}
2019-07-08 14:07:09 +00:00
continue
}
2019-07-25 22:15:03 +00:00
m , err := types . DecodeSignedMessage ( msg . GetData ( ) )
2019-07-08 14:07:09 +00:00
if err != nil {
log . Errorf ( "got incorrectly formatted Message: %s" , err )
continue
}
if err := mpool . Add ( m ) ; err != nil {
2019-12-07 22:32:34 +00:00
log . Warnf ( "failed to add message from network to message pool (From: %s, To: %s, Nonce: %d, Value: %s): %s" , m . Message . From , m . Message . To , m . Message . Nonce , types . FIL ( m . Message . Value ) , err )
2019-07-08 14:07:09 +00:00
continue
}
}
}