2019-07-05 14:29:17 +00:00
package chain
import (
2020-03-19 02:42:24 +00:00
"bytes"
2019-07-05 14:29:17 +00:00
"context"
2019-11-06 15:11:19 +00:00
"errors"
2019-07-05 14:29:17 +00:00
"fmt"
2020-04-14 14:09:11 +00:00
"sort"
2020-09-09 05:14:01 +00:00
"sync"
2019-08-30 02:59:54 +00:00
"time"
2019-07-05 14:29:17 +00:00
2019-10-23 14:45:03 +00:00
"github.com/Gurpartap/async"
2019-11-11 19:32:30 +00:00
"github.com/hashicorp/go-multierror"
2020-09-24 01:53:28 +00:00
blocks "github.com/ipfs/go-block-format"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
2020-02-04 22:19:05 +00:00
cbor "github.com/ipfs/go-ipld-cbor"
2022-06-28 11:09:59 +00:00
ipld "github.com/ipfs/go-ipld-format"
2020-01-08 19:10:57 +00:00
logging "github.com/ipfs/go-log/v2"
2022-08-25 18:20:41 +00:00
"github.com/libp2p/go-libp2p/core/connmgr"
"github.com/libp2p/go-libp2p/core/peer"
2019-09-17 18:09:22 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
2020-02-26 02:42:34 +00:00
"go.opencensus.io/stats"
2019-10-11 02:17:24 +00:00
"go.opencensus.io/trace"
2019-09-17 18:09:22 +00:00
"golang.org/x/xerrors"
2019-11-11 19:32:30 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/crypto"
2022-06-14 15:00:51 +00:00
"github.com/filecoin-project/pubsub"
"github.com/filecoin-project/lotus/chain/consensus"
"github.com/filecoin-project/lotus/node/modules/dtypes"
2020-10-08 01:09:33 +00:00
// named msgarray here to make it clear that these are the types used by
// messages, regardless of specs-actors version.
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/api"
2021-01-29 20:01:00 +00:00
bstore "github.com/filecoin-project/lotus/blockstore"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/build"
2020-03-25 23:16:17 +00:00
"github.com/filecoin-project/lotus/chain/beacon"
2020-09-07 18:31:43 +00:00
"github.com/filecoin-project/lotus/chain/exchange"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2020-05-12 11:48:09 +00:00
"github.com/filecoin-project/lotus/chain/vm"
2020-02-26 02:42:34 +00:00
"github.com/filecoin-project/lotus/metrics"
2019-07-05 14:29:17 +00:00
)
2020-09-14 20:58:59 +00:00
var (
// LocalIncoming is the _local_ pubsub (unrelated to libp2p pubsub) topic
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"
2020-09-16 18:04:44 +00:00
log = logging . Logger ( "chain" )
2020-08-25 19:39:17 +00:00
2020-09-16 19:09:36 +00:00
concurrentSyncRequests = exchange . ShufflePeersPrefix
2020-09-16 18:55:51 +00:00
syncRequestBatchSize = 8
2020-09-16 18:04:44 +00:00
syncRequestRetries = 5
)
2020-08-25 19:39:17 +00:00
2020-06-23 21:51:25 +00:00
// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
//
2022-08-29 14:25:30 +00:00
// - Fast-forwards the chain as it learns of new TipSets from the network via
// the SyncManager.
// - Applies the fork choice rule to select the correct side when confronted
// with a fork in the network.
// - Requests block headers and messages from other peers when not available
// in our BlockStore.
// - Tracks blocks marked as bad in a cache.
// - Keeps the BlockStore and ChainStore consistent with our view of the world,
// the latter of which in turn informs other components when a reorg has been
// committed.
2020-06-23 21:51:25 +00:00
//
// The Syncer does not run workers itself. It's mainly concerned with
// ensuring a consistent state of chain consensus. The reactive and network-
// interfacing processes are part of other components, such as the SyncManager
2020-09-07 18:45:34 +00:00
// (which owns the sync scheduler and sync workers), ChainExchange, the HELLO
2020-06-23 21:51:25 +00:00
// protocol, and the gossipsub block propagation layer.
//
// {hint/concept} The fork-choice rule as it currently stands is: "pick the
// chain with the heaviest weight, so long as it hasn’ t deviated one finality
// threshold from our head (900 epochs, parameter determined by spec-actors)".
2019-07-05 14:29:17 +00:00
type Syncer struct {
// The interface for accessing and putting tipsets into local storage
2019-07-26 04:54:22 +00:00
store * store . ChainStore
2019-07-05 14:29:17 +00:00
2020-03-25 23:16:17 +00:00
// handle to the random beacon for verification
2020-09-08 20:28:06 +00:00
beacon beacon . Schedule
2020-03-25 23:16:17 +00:00
2019-09-06 06:26:02 +00:00
// the state manager handles making state queries
sm * stmgr . StateManager
2021-09-02 16:07:23 +00:00
consensus consensus . Consensus
2019-07-05 14:46:21 +00:00
// The known Genesis tipset
2019-07-26 04:54:22 +00:00
Genesis * types . TipSet
2019-07-05 14:29:17 +00:00
// TipSets known to be invalid
2019-10-09 08:50:57 +00:00
bad * BadBlockCache
2019-07-05 14:29:17 +00:00
// handle to the block sync service
2020-09-07 18:31:43 +00:00
Exchange exchange . Client
2019-07-05 14:29:17 +00:00
2019-07-11 02:36:43 +00:00
self peer . ID
2020-09-14 20:58:59 +00:00
syncmgr SyncManager
2019-11-18 21:39:07 +00:00
2019-12-19 06:19:15 +00:00
connmgr connmgr . ConnManager
2019-11-18 21:39:07 +00:00
incoming * pubsub . PubSub
2019-12-19 06:19:15 +00:00
receiptTracker * blockReceiptTracker
2020-04-17 14:47:19 +00:00
2020-09-06 04:32:05 +00:00
tickerCtxCancel context . CancelFunc
2020-09-09 05:14:01 +00:00
ds dtypes . MetadataDS
2019-07-05 14:29:17 +00:00
}
2020-09-14 20:58:59 +00:00
type SyncManagerCtor func ( syncFn SyncFunc ) SyncManager
2021-09-02 16:07:23 +00:00
type Genesis * types . TipSet
2021-12-11 21:03:00 +00:00
func LoadGenesis ( ctx context . Context , sm * stmgr . StateManager ) ( Genesis , error ) {
gen , err := sm . ChainStore ( ) . GetGenesis ( ctx )
2019-07-05 14:29:17 +00:00
if err != nil {
2020-05-08 17:59:18 +00:00
return nil , xerrors . Errorf ( "getting genesis block: %w" , err )
2019-07-05 14:29:17 +00:00
}
2021-09-02 16:07:23 +00:00
return types . NewTipSet ( [ ] * types . BlockHeader { gen } )
}
// NewSyncer creates a new Syncer object.
func NewSyncer ( ds dtypes . MetadataDS ,
sm * stmgr . StateManager ,
exchange exchange . Client ,
syncMgrCtor SyncManagerCtor ,
connmgr connmgr . ConnManager ,
self peer . ID ,
beacon beacon . Schedule ,
gent Genesis ,
consensus consensus . Consensus ) ( * Syncer , error ) {
2019-07-05 14:29:17 +00:00
2019-11-15 21:35:29 +00:00
s := & Syncer {
2020-09-09 05:14:01 +00:00
ds : ds ,
2020-03-25 23:16:17 +00:00
beacon : beacon ,
2019-12-19 06:19:15 +00:00
bad : NewBadBlockCache ( ) ,
Genesis : gent ,
2021-09-02 16:07:23 +00:00
consensus : consensus ,
2020-09-07 18:31:43 +00:00
Exchange : exchange ,
2019-12-19 06:19:15 +00:00
store : sm . ChainStore ( ) ,
sm : sm ,
self : self ,
receiptTracker : newBlockReceiptTracker ( ) ,
connmgr : connmgr ,
2019-11-18 21:39:07 +00:00
2019-11-20 19:44:38 +00:00
incoming : pubsub . New ( 50 ) ,
2019-11-15 21:35:29 +00:00
}
2020-09-14 20:58:59 +00:00
s . syncmgr = syncMgrCtor ( s . Sync )
2019-11-15 21:35:29 +00:00
return s , nil
2019-07-05 14:29:17 +00:00
}
2019-11-15 21:35:29 +00:00
func ( syncer * Syncer ) Start ( ) {
2020-09-06 04:32:05 +00:00
tickerCtx , tickerCtxCancel := context . WithCancel ( context . Background ( ) )
2019-11-15 21:35:29 +00:00
syncer . syncmgr . Start ( )
2020-09-06 04:32:05 +00:00
syncer . tickerCtxCancel = tickerCtxCancel
go syncer . runMetricsTricker ( tickerCtx )
}
func ( syncer * Syncer ) runMetricsTricker ( tickerCtx context . Context ) {
genesisTime := time . Unix ( int64 ( syncer . Genesis . MinTimestamp ( ) ) , 0 )
ticker := build . Clock . Ticker ( time . Duration ( build . BlockDelaySecs ) * time . Second )
defer ticker . Stop ( )
for {
select {
case <- ticker . C :
sinceGenesis := build . Clock . Now ( ) . Sub ( genesisTime )
expectedHeight := int64 ( sinceGenesis . Seconds ( ) ) / int64 ( build . BlockDelaySecs )
2020-09-07 16:21:45 +00:00
stats . Record ( tickerCtx , metrics . ChainNodeHeightExpected . M ( expectedHeight ) )
2020-09-06 04:32:05 +00:00
case <- tickerCtx . Done ( ) :
return
}
}
2019-11-15 21:35:29 +00:00
}
func ( syncer * Syncer ) Stop ( ) {
syncer . syncmgr . Stop ( )
2020-09-06 04:32:05 +00:00
syncer . tickerCtxCancel ( )
2019-07-05 14:29:17 +00:00
}
// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
2019-12-17 07:06:48 +00:00
func ( syncer * Syncer ) InformNewHead ( from peer . ID , fts * store . FullTipSet ) bool {
2020-10-08 20:49:36 +00:00
defer func ( ) {
if err := recover ( ) ; err != nil {
2021-09-02 16:07:23 +00:00
log . Errorf ( "panic in InformNewHead: %s" , err )
2020-10-08 20:49:36 +00:00
}
} ( )
2019-10-10 11:13:26 +00:00
ctx := context . Background ( )
2019-07-05 14:29:17 +00:00
if fts == nil {
2019-11-16 23:41:14 +00:00
log . Errorf ( "got nil tipset in InformNewHead" )
2019-12-17 07:06:48 +00:00
return false
2019-07-05 14:29:17 +00:00
}
2019-10-06 03:32:56 +00:00
2021-09-02 16:07:23 +00:00
if syncer . consensus . IsEpochBeyondCurrMax ( fts . TipSet ( ) . Height ( ) ) {
2020-06-12 04:47:57 +00:00
log . Errorf ( "Received block with impossibly large height %d" , fts . TipSet ( ) . Height ( ) )
return false
}
2019-10-06 03:32:56 +00:00
for _ , b := range fts . Blocks {
2020-02-12 07:21:11 +00:00
if reason , ok := syncer . bad . Has ( b . Cid ( ) ) ; ok {
log . Warnf ( "InformNewHead called on block marked as bad: %s (reason: %s)" , b . Cid ( ) , reason )
2020-02-07 06:39:24 +00:00
return false
}
2019-10-15 12:19:10 +00:00
if err := syncer . ValidateMsgMeta ( b ) ; err != nil {
2019-10-06 03:32:56 +00:00
log . Warnf ( "invalid block received: %s" , err )
2019-12-17 07:06:48 +00:00
return false
2019-10-06 03:32:56 +00:00
}
}
2019-12-01 23:11:43 +00:00
syncer . incoming . Pub ( fts . TipSet ( ) . Blocks ( ) , LocalIncoming )
2019-11-18 21:39:07 +00:00
2019-11-15 03:19:16 +00:00
// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
// the blockstore
2021-12-11 21:03:00 +00:00
if err := syncer . store . PersistBlockHeaders ( ctx , fts . TipSet ( ) . Blocks ( ) ... ) ; err != nil {
2019-11-15 03:19:16 +00:00
log . Warn ( "failed to persist incoming block header: " , err )
2019-12-17 07:06:48 +00:00
return false
2019-11-15 03:19:16 +00:00
}
2020-09-07 18:31:43 +00:00
syncer . Exchange . AddPeer ( from )
2019-07-05 14:29:17 +00:00
2020-10-20 02:09:49 +00:00
hts := syncer . store . GetHeaviestTipSet ( )
bestPweight := hts . ParentWeight ( )
2020-08-06 15:00:45 +00:00
targetWeight := fts . TipSet ( ) . ParentWeight ( )
2019-11-10 23:06:06 +00:00
if targetWeight . LessThan ( bestPweight ) {
2019-12-04 04:59:41 +00:00
var miners [ ] string
for _ , blk := range fts . TipSet ( ) . Blocks ( ) {
miners = append ( miners , blk . Miner . String ( ) )
}
2020-11-03 12:28:31 +00:00
log . Debugw ( "incoming tipset does not appear to be better than our best chain, ignoring for now" , "miners" , miners , "bestPweight" , bestPweight , "bestTS" , hts . Cids ( ) , "incomingWeight" , targetWeight , "incomingTS" , fts . TipSet ( ) . Cids ( ) )
2019-12-17 07:06:48 +00:00
return false
2019-11-09 20:14:40 +00:00
}
2019-11-16 06:48:42 +00:00
syncer . syncmgr . SetPeerHead ( ctx , from , fts . TipSet ( ) )
2019-12-17 07:06:48 +00:00
return true
2019-07-05 14:29:17 +00:00
}
2020-06-23 21:51:25 +00:00
// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to
// receive new block headers as they arrive from the network, and sends them to
// the returned channel.
//
// These blocks have not necessarily been incorporated to our view of the chain.
2019-11-18 21:39:07 +00:00
func ( syncer * Syncer ) IncomingBlocks ( ctx context . Context ) ( <- chan * types . BlockHeader , error ) {
2019-12-01 23:11:43 +00:00
sub := syncer . incoming . Sub ( LocalIncoming )
2019-11-18 21:39:07 +00:00
out := make ( chan * types . BlockHeader , 10 )
go func ( ) {
2022-05-10 17:40:40 +00:00
defer syncer . incoming . Unsub ( sub )
2019-11-19 19:49:11 +00:00
2019-11-18 21:39:07 +00:00
for {
select {
case r := <- sub :
hs := r . ( [ ] * types . BlockHeader )
for _ , h := range hs {
select {
case out <- h :
case <- ctx . Done ( ) :
return
}
}
case <- ctx . Done ( ) :
return
}
}
} ( )
return out , nil
}
2020-06-23 21:51:25 +00:00
// ValidateMsgMeta performs structural and content hash validation of the
// messages within this block. If validation passes, it stores the messages in
// the underlying IPLD block store.
2019-10-15 12:19:10 +00:00
func ( syncer * Syncer ) ValidateMsgMeta ( fblk * types . FullBlock ) error {
2020-01-07 20:41:26 +00:00
if msgc := len ( fblk . BlsMessages ) + len ( fblk . SecpkMessages ) ; msgc > build . BlockMessageLimit {
return xerrors . Errorf ( "block %s has too many messages (%d)" , fblk . Header . Cid ( ) , msgc )
}
2019-11-15 03:19:16 +00:00
// TODO: IMPORTANT(GARBAGE). These message puts and the msgmeta
// computation need to go into the 'temporary' side of the blockstore when
// we implement that
2020-10-14 19:23:11 +00:00
// We use a temporary bstore here to avoid writing intermediate pieces
// into the blockstore.
2021-01-29 20:01:00 +00:00
blockstore := bstore . NewMemory ( )
2020-10-14 19:23:11 +00:00
cst := cbor . NewCborStore ( blockstore )
2021-12-17 09:42:09 +00:00
ctx := context . Background ( )
2020-10-14 19:23:11 +00:00
var bcids , scids [ ] cid . Cid
2019-10-06 03:32:56 +00:00
2019-11-15 03:19:16 +00:00
for _ , m := range fblk . BlsMessages {
2021-12-17 09:42:09 +00:00
c , err := store . PutMessage ( ctx , blockstore , m )
2019-11-15 03:19:16 +00:00
if err != nil {
return xerrors . Errorf ( "putting bls message to blockstore after msgmeta computation: %w" , err )
}
2020-10-14 19:23:11 +00:00
bcids = append ( bcids , c )
2019-11-15 03:19:16 +00:00
}
for _ , m := range fblk . SecpkMessages {
2021-12-17 09:42:09 +00:00
c , err := store . PutMessage ( ctx , blockstore , m )
2019-11-15 03:19:16 +00:00
if err != nil {
return xerrors . Errorf ( "putting bls message to blockstore after msgmeta computation: %w" , err )
}
2020-10-14 19:23:11 +00:00
scids = append ( scids , c )
2019-11-15 03:19:16 +00:00
}
2020-10-14 19:23:11 +00:00
// Compute the root CID of the combined message trie.
smroot , err := computeMsgMeta ( cst , bcids , scids )
if err != nil {
return xerrors . Errorf ( "validating msgmeta, compute failed: %w" , err )
}
// Check that the message trie root matches with what's in the block.
if fblk . Header . Messages != smroot {
return xerrors . Errorf ( "messages in full block did not match msgmeta root in header (%s != %s)" , fblk . Header . Messages , smroot )
}
// Finally, flush.
2021-02-28 22:48:36 +00:00
return vm . Copy ( context . TODO ( ) , blockstore , syncer . store . ChainBlockstore ( ) , smroot )
2019-10-06 03:32:56 +00:00
}
2019-10-14 14:21:37 +00:00
func ( syncer * Syncer ) LocalPeer ( ) peer . ID {
return syncer . self
}
func ( syncer * Syncer ) ChainStore ( ) * store . ChainStore {
return syncer . store
}
2019-12-17 07:06:48 +00:00
func ( syncer * Syncer ) InformNewBlock ( from peer . ID , blk * types . FullBlock ) bool {
2019-07-05 14:29:17 +00:00
// TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { Blocks : [ ] * types . FullBlock { blk } }
2019-12-17 07:06:48 +00:00
return syncer . InformNewHead ( from , fts )
2019-07-05 14:29:17 +00:00
}
2020-09-24 01:53:28 +00:00
func copyBlockstore ( ctx context . Context , from , to bstore . Blockstore ) error {
ctx , span := trace . StartSpan ( ctx , "copyBlockstore" )
defer span . End ( )
cids , err := from . AllKeysChan ( ctx )
2019-07-05 14:29:17 +00:00
if err != nil {
return err
}
2020-09-24 01:53:28 +00:00
// TODO: should probably expose better methods on the blockstore for this operation
var blks [ ] blocks . Block
2019-07-05 14:29:17 +00:00
for c := range cids {
2021-12-11 21:03:00 +00:00
b , err := from . Get ( ctx , c )
2019-07-05 14:29:17 +00:00
if err != nil {
return err
}
2020-09-24 01:53:28 +00:00
blks = append ( blks , b )
}
2021-12-11 21:03:00 +00:00
if err := to . PutMany ( ctx , blks ) ; err != nil {
2020-09-24 01:53:28 +00:00
return err
2019-07-05 14:29:17 +00:00
}
return nil
}
2019-08-01 20:40:47 +00:00
// TODO: this function effectively accepts unchecked input from the network,
// either validate it here, or ensure that its validated elsewhere (maybe make
// sure the blocksync code checks it?)
// maybe this code should actually live in blocksync??
2020-02-05 02:26:42 +00:00
func zipTipSetAndMessages ( bs cbor . IpldStore , ts * types . TipSet , allbmsgs [ ] * types . Message , allsmsgs [ ] * types . SignedMessage , bmi , smi [ ] [ ] uint64 ) ( * store . FullTipSet , error ) {
2019-08-01 20:40:47 +00:00
if len ( ts . Blocks ( ) ) != len ( smi ) || len ( ts . Blocks ( ) ) != len ( bmi ) {
2019-07-05 14:29:17 +00:00
return nil , fmt . Errorf ( "msgincl length didnt match tipset size" )
}
2021-09-02 16:07:23 +00:00
if err := checkMsgMeta ( ts , allbmsgs , allsmsgs , bmi , smi ) ; err != nil {
return nil , err
}
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { }
2019-07-05 14:29:17 +00:00
for bi , b := range ts . Blocks ( ) {
2020-07-10 23:59:58 +00:00
2019-08-01 20:40:47 +00:00
var smsgs [ ] * types . SignedMessage
for _ , m := range smi [ bi ] {
smsgs = append ( smsgs , allsmsgs [ m ] )
2019-07-05 14:29:17 +00:00
}
2019-08-01 20:40:47 +00:00
var bmsgs [ ] * types . Message
for _ , m := range bmi [ bi ] {
bmsgs = append ( bmsgs , allbmsgs [ m ] )
2019-07-05 14:29:17 +00:00
}
2019-07-25 22:15:03 +00:00
fb := & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : b ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-07-05 14:29:17 +00:00
}
fts . Blocks = append ( fts . Blocks , fb )
}
return fts , nil
}
2020-06-23 21:51:25 +00:00
// computeMsgMeta computes the root CID of the combined arrays of message CIDs
// of both types (BLS and Secpk).
2020-07-23 00:14:54 +00:00
func computeMsgMeta ( bs cbor . IpldStore , bmsgCids , smsgCids [ ] cid . Cid ) ( cid . Cid , error ) {
2020-09-28 21:25:58 +00:00
// block headers use adt0
2020-10-08 01:09:33 +00:00
store := blockadt . WrapStore ( context . TODO ( ) , bs )
bmArr := blockadt . MakeEmptyArray ( store )
smArr := blockadt . MakeEmptyArray ( store )
2020-07-23 00:14:54 +00:00
for i , m := range bmsgCids {
c := cbg . CborCid ( m )
if err := bmArr . Set ( uint64 ( i ) , & c ) ; err != nil {
return cid . Undef , err
}
}
for i , m := range smsgCids {
c := cbg . CborCid ( m )
if err := smArr . Set ( uint64 ( i ) , & c ) ; err != nil {
return cid . Undef , err
}
}
bmroot , err := bmArr . Root ( )
2019-10-06 03:32:56 +00:00
if err != nil {
return cid . Undef , err
}
2020-07-23 00:14:54 +00:00
smroot , err := smArr . Root ( )
2019-10-06 03:32:56 +00:00
if err != nil {
return cid . Undef , err
}
2020-07-23 00:14:54 +00:00
mrcid , err := store . Put ( store . Context ( ) , & types . MsgMeta {
2019-10-06 03:32:56 +00:00
BlsMessages : bmroot ,
SecpkMessages : smroot ,
} )
if err != nil {
return cid . Undef , xerrors . Errorf ( "failed to put msgmeta: %w" , err )
}
return mrcid , nil
}
2020-06-23 21:51:25 +00:00
// FetchTipSet tries to load the provided tipset from the store, and falls back
2020-09-07 18:31:43 +00:00
// to the network (client) by querying the supplied peer if not found
2020-06-23 21:51:25 +00:00
// locally.
//
// {hint/usage} This is used from the HELLO protocol, to fetch the greeting
// peer's heaviest tipset if we don't have it.
2019-12-16 19:22:56 +00:00
func ( syncer * Syncer ) FetchTipSet ( ctx context . Context , p peer . ID , tsk types . TipSetKey ) ( * store . FullTipSet , error ) {
2021-12-11 21:03:00 +00:00
if fts , err := syncer . tryLoadFullTipSet ( ctx , tsk ) ; err == nil {
2019-07-05 14:29:17 +00:00
return fts , nil
}
2020-06-23 21:51:25 +00:00
// fall back to the network.
2020-09-07 18:31:43 +00:00
return syncer . Exchange . GetFullTipSet ( ctx , p , tsk )
2019-07-05 14:29:17 +00:00
}
2020-06-23 21:51:25 +00:00
// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full
// representation of it containing FullBlocks. If ALL blocks are not found
// locally, it errors entirely with blockstore.ErrNotFound.
2021-12-11 21:03:00 +00:00
func ( syncer * Syncer ) tryLoadFullTipSet ( ctx context . Context , tsk types . TipSetKey ) ( * store . FullTipSet , error ) {
ts , err := syncer . store . LoadTipSet ( ctx , tsk )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { }
2019-07-05 14:29:17 +00:00
for _ , b := range ts . Blocks ( ) {
2021-12-17 09:42:09 +00:00
bmsgs , smsgs , err := syncer . store . MessagesForBlock ( ctx , b )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
2019-07-25 22:15:03 +00:00
fb := & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : b ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-07-05 14:29:17 +00:00
}
fts . Blocks = append ( fts . Blocks , fb )
}
return fts , nil
}
2020-06-23 21:51:25 +00:00
// Sync tries to advance our view of the chain to `maybeHead`. It does nothing
// if our current head is heavier than the requested tipset, or if we're already
// at the requested head, or if the head is the genesis.
//
// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the
// godocs on that method for a more detailed view.
2019-10-10 11:13:26 +00:00
func ( syncer * Syncer ) Sync ( ctx context . Context , maybeHead * types . TipSet ) error {
ctx , span := trace . StartSpan ( ctx , "chain.Sync" )
defer span . End ( )
2019-11-16 01:05:16 +00:00
2019-11-10 23:06:06 +00:00
if span . IsRecordingEvents ( ) {
span . AddAttributes (
trace . StringAttribute ( "tipset" , fmt . Sprint ( maybeHead . Cids ( ) ) ) ,
trace . Int64Attribute ( "height" , int64 ( maybeHead . Height ( ) ) ) ,
)
}
2019-10-06 03:32:56 +00:00
2020-11-02 21:56:26 +00:00
hts := syncer . store . GetHeaviestTipSet ( )
if hts . ParentWeight ( ) . GreaterThan ( maybeHead . ParentWeight ( ) ) {
2019-11-20 19:44:38 +00:00
return nil
}
2020-11-02 21:56:26 +00:00
if syncer . Genesis . Equals ( maybeHead ) || hts . Equals ( maybeHead ) {
2019-07-05 14:29:17 +00:00
return nil
}
2021-04-28 19:49:21 +00:00
if err := syncer . collectChain ( ctx , maybeHead , hts , false ) ; err != nil {
2019-11-10 23:06:06 +00:00
span . AddAttributes ( trace . StringAttribute ( "col_error" , err . Error ( ) ) )
2019-12-04 23:20:02 +00:00
span . SetStatus ( trace . Status {
Code : 13 ,
Message : err . Error ( ) ,
} )
2019-08-16 00:17:09 +00:00
return xerrors . Errorf ( "collectChain failed: %w" , err )
2019-07-05 14:29:17 +00:00
}
2020-06-09 23:17:28 +00:00
// At this point we have accepted and synced to the new `maybeHead`
// (`StageSyncComplete`).
2019-10-15 05:00:30 +00:00
if err := syncer . store . PutTipSet ( ctx , maybeHead ) ; err != nil {
2019-11-10 23:06:06 +00:00
span . AddAttributes ( trace . StringAttribute ( "put_error" , err . Error ( ) ) )
2019-12-04 23:20:02 +00:00
span . SetStatus ( trace . Status {
Code : 13 ,
Message : err . Error ( ) ,
} )
2019-10-06 00:51:48 +00:00
return xerrors . Errorf ( "failed to put synced tipset to chainstore: %w" , err )
2019-07-05 14:29:17 +00:00
}
2019-12-19 06:19:15 +00:00
peers := syncer . receiptTracker . GetPeers ( maybeHead )
if len ( peers ) > 0 {
syncer . connmgr . TagPeer ( peers [ 0 ] , "new-block" , 40 )
for _ , p := range peers [ 1 : ] {
syncer . connmgr . TagPeer ( p , "new-block" , 25 )
}
}
2019-07-05 14:29:17 +00:00
return nil
}
2019-11-06 19:26:01 +00:00
func isPermanent ( err error ) bool {
2021-09-02 16:07:23 +00:00
return ! errors . Is ( err , consensus . ErrTemporal )
2019-11-06 19:26:01 +00:00
}
2020-09-30 05:39:06 +00:00
func ( syncer * Syncer ) ValidateTipSet ( ctx context . Context , fts * store . FullTipSet , useCache bool ) error {
2019-10-12 09:44:56 +00:00
ctx , span := trace . StartSpan ( ctx , "validateTipSet" )
defer span . End ( )
2019-12-17 03:36:32 +00:00
span . AddAttributes ( trace . Int64Attribute ( "height" , int64 ( fts . TipSet ( ) . Height ( ) ) ) )
2019-07-05 14:29:17 +00:00
ts := fts . TipSet ( )
2019-07-05 14:46:21 +00:00
if ts . Equals ( syncer . Genesis ) {
2019-07-05 14:29:17 +00:00
return nil
}
2020-06-22 23:09:05 +00:00
var futures [ ] async . ErrorFuture
2019-07-05 14:29:17 +00:00
for _ , b := range fts . Blocks {
2020-06-23 11:21:01 +00:00
b := b // rebind to a scoped variable
2020-06-22 23:09:05 +00:00
futures = append ( futures , async . Err ( func ( ) error {
2020-09-30 05:39:06 +00:00
if err := syncer . ValidateBlock ( ctx , b , useCache ) ; err != nil {
2020-06-22 23:09:05 +00:00
if isPermanent ( err ) {
2020-07-08 01:55:56 +00:00
syncer . bad . Add ( b . Cid ( ) , NewBadBlockReason ( [ ] cid . Cid { b . Cid ( ) } , err . Error ( ) ) )
2020-06-22 23:09:05 +00:00
}
return xerrors . Errorf ( "validating block %s: %w" , b . Cid ( ) , err )
2019-11-06 15:11:19 +00:00
}
2019-10-10 03:04:10 +00:00
2021-12-11 21:03:00 +00:00
if err := syncer . sm . ChainStore ( ) . AddToTipSetTracker ( ctx , b . Header ) ; err != nil {
2020-06-22 23:09:05 +00:00
return xerrors . Errorf ( "failed to add validated header to tipset tracker: %w" , err )
}
return nil
} ) )
}
for _ , f := range futures {
if err := f . AwaitContext ( ctx ) ; err != nil {
return err
2019-10-10 03:04:10 +00:00
}
2019-07-05 14:29:17 +00:00
}
return nil
}
2020-06-02 14:29:39 +00:00
// ValidateBlock should match up with 'Semantical Validation' in validation.md in the spec
2020-09-30 05:39:06 +00:00
func ( syncer * Syncer ) ValidateBlock ( ctx context . Context , b * types . FullBlock , useCache bool ) ( err error ) {
2020-06-14 09:49:20 +00:00
defer func ( ) {
// b.Cid() could panic for empty blocks that are used in tests.
if rerr := recover ( ) ; rerr != nil {
err = xerrors . Errorf ( "validate block panic: %w" , rerr )
return
}
} ( )
2020-09-30 05:39:06 +00:00
if useCache {
isValidated , err := syncer . store . IsBlockValidated ( ctx , b . Cid ( ) )
if err != nil {
return xerrors . Errorf ( "check block validation cache %s: %w" , b . Cid ( ) , err )
}
2020-06-14 09:49:20 +00:00
2020-09-30 05:39:06 +00:00
if isValidated {
return nil
}
2020-06-14 09:49:20 +00:00
}
2020-07-10 14:43:14 +00:00
validationStart := build . Clock . Now ( )
2020-06-04 22:18:14 +00:00
defer func ( ) {
2020-07-24 05:47:41 +00:00
stats . Record ( ctx , metrics . BlockValidationDurationMilliseconds . M ( metrics . SinceInMilliseconds ( validationStart ) ) )
2020-08-26 21:48:53 +00:00
log . Infow ( "block validation" , "took" , time . Since ( validationStart ) , "height" , b . Header . Height , "age" , time . Since ( time . Unix ( int64 ( b . Header . Timestamp ) , 0 ) ) )
2020-06-04 22:18:14 +00:00
} ( )
2019-10-12 09:44:56 +00:00
ctx , span := trace . StartSpan ( ctx , "validateBlock" )
defer span . End ( )
2020-06-04 22:18:14 +00:00
2021-09-02 16:07:23 +00:00
if err := syncer . consensus . ValidateBlock ( ctx , b ) ; err != nil {
return err
2020-03-26 00:01:49 +00:00
}
2019-10-14 03:28:19 +00:00
2020-09-30 05:39:06 +00:00
if useCache {
if err := syncer . store . MarkBlockAsValidated ( ctx , b . Cid ( ) ) ; err != nil {
return xerrors . Errorf ( "caching block validation %s: %w" , b . Cid ( ) , err )
}
2020-06-14 09:49:20 +00:00
}
return nil
2019-10-14 03:28:19 +00:00
}
2019-11-19 20:36:27 +00:00
type syncStateKey struct { }
2019-11-15 21:35:29 +00:00
func extractSyncState ( ctx context . Context ) * SyncerState {
2019-11-19 20:36:27 +00:00
v := ctx . Value ( syncStateKey { } )
2019-11-15 21:35:29 +00:00
if v != nil {
return v . ( * SyncerState )
}
return nil
}
2020-06-23 21:51:25 +00:00
// collectHeaders collects the headers from the blocks between any two tipsets.
//
2020-07-08 20:02:28 +00:00
// `incoming` is the heaviest/projected/target tipset we have learned about, and
// `known` is usually an anchor tipset we already have in our view of the chain
2020-06-23 21:51:25 +00:00
// (which could be the genesis).
//
// collectHeaders checks if portions of the chain are in our ChainStore; falling
// down to the network to retrieve the missing parts. If during the process, any
// portion we receive is in our denylist (bad list), we short-circuit.
//
// {hint/usage}: This is used by collectChain, which is in turn called from the
// main Sync method (Syncer#Sync), so it's a pretty central method.
//
// {hint/logic}: The logic of this method is as follows:
//
// 1. Check that the from tipset is not linked to a parent block known to be
// bad.
// 2. Check the consistency of beacon entries in the from tipset. We check
// total equality of the BeaconEntries in each block.
2020-07-08 20:02:28 +00:00
// 3. Traverse the chain backwards, for each tipset:
2022-08-29 14:25:30 +00:00
// 3a. Load it from the chainstore; if found, it move on to its parent.
// 3b. Query our peers via client in batches, requesting up to a
// maximum of 500 tipsets every time.
2020-06-23 21:51:25 +00:00
//
// Once we've concluded, if we find a mismatching tipset at the height where the
// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork
// to resolve it. Refer to the godocs there.
//
// All throughout the process, we keep checking if the received blocks are in
// the deny list, and short-circuit the process if so.
2021-04-28 19:49:21 +00:00
func ( syncer * Syncer ) collectHeaders ( ctx context . Context , incoming * types . TipSet , known * types . TipSet , ignoreCheckpoint bool ) ( [ ] * types . TipSet , error ) {
2019-10-12 09:44:56 +00:00
ctx , span := trace . StartSpan ( ctx , "collectHeaders" )
defer span . End ( )
2019-11-15 21:35:29 +00:00
ss := extractSyncState ( ctx )
2019-10-13 01:05:43 +00:00
span . AddAttributes (
2020-07-10 19:31:58 +00:00
trace . Int64Attribute ( "incomingHeight" , int64 ( incoming . Height ( ) ) ) ,
trace . Int64Attribute ( "knownHeight" , int64 ( known . Height ( ) ) ) ,
2019-10-13 01:05:43 +00:00
)
2019-10-12 09:44:56 +00:00
2020-06-23 21:51:25 +00:00
// Check if the parents of the from block are in the denylist.
// i.e. if a fork of the chain has been requested that we know to be bad.
2020-07-08 20:02:28 +00:00
for _ , pcid := range incoming . Parents ( ) . Cids ( ) {
2020-02-12 07:21:11 +00:00
if reason , ok := syncer . bad . Has ( pcid ) ; ok {
2020-07-06 17:23:29 +00:00
newReason := reason . Linked ( "linked to %s" , pcid )
2020-07-08 20:02:28 +00:00
for _ , b := range incoming . Cids ( ) {
2020-07-06 17:23:29 +00:00
syncer . bad . Add ( b , newReason )
}
2020-07-08 20:02:28 +00:00
return nil , xerrors . Errorf ( "chain linked to block marked previously as bad (%s, %s) (reason: %s)" , incoming . Cids ( ) , pcid , reason )
2019-11-10 23:06:06 +00:00
}
}
2020-04-06 12:47:14 +00:00
{
2020-04-07 18:23:16 +00:00
// ensure consistency of beacon entires
2020-07-08 20:02:28 +00:00
targetBE := incoming . Blocks ( ) [ 0 ] . BeaconEntries
2020-04-14 14:09:11 +00:00
sorted := sort . SliceIsSorted ( targetBE , func ( i , j int ) bool {
return targetBE [ i ] . Round < targetBE [ j ] . Round
} )
if ! sorted {
2020-07-08 20:02:28 +00:00
syncer . bad . Add ( incoming . Cids ( ) [ 0 ] , NewBadBlockReason ( incoming . Cids ( ) , "wrong order of beacon entires" ) )
2020-04-14 14:09:11 +00:00
return nil , xerrors . Errorf ( "wrong order of beacon entires" )
2020-04-07 18:23:16 +00:00
}
2020-04-06 12:47:14 +00:00
2020-07-08 20:02:28 +00:00
for _ , bh := range incoming . Blocks ( ) [ 1 : ] {
2020-04-06 12:47:14 +00:00
if len ( targetBE ) != len ( bh . BeaconEntries ) {
2020-04-07 18:23:16 +00:00
// cannot mark bad, I think @Kubuxu
2020-04-06 12:47:14 +00:00
return nil , xerrors . Errorf ( "tipset contained different number for beacon entires" )
}
for i , be := range bh . BeaconEntries {
2020-04-30 18:39:37 +00:00
if targetBE [ i ] . Round != be . Round || ! bytes . Equal ( targetBE [ i ] . Data , be . Data ) {
2020-04-07 18:23:16 +00:00
// cannot mark bad, I think @Kubuxu
2020-04-14 14:09:11 +00:00
return nil , xerrors . Errorf ( "tipset contained different beacon entires" )
2020-04-06 12:47:14 +00:00
}
}
}
}
2020-07-08 20:02:28 +00:00
blockSet := [ ] * types . TipSet { incoming }
2019-07-05 14:29:17 +00:00
2020-10-08 20:49:36 +00:00
// Parent of the new (possibly better) tipset that we need to fetch next.
2020-07-08 20:02:28 +00:00
at := incoming . Parents ( )
2019-07-05 14:29:17 +00:00
2020-10-08 20:49:36 +00:00
// we want to sync all the blocks until the height above our
// best tipset so far
2020-07-08 20:02:28 +00:00
untilHeight := known . Height ( ) + 1
2019-08-02 22:21:46 +00:00
2019-11-15 21:35:29 +00:00
ss . SetHeight ( blockSet [ len ( blockSet ) - 1 ] . Height ( ) )
2019-10-23 08:18:07 +00:00
2019-11-11 22:37:34 +00:00
var acceptedBlocks [ ] cid . Cid
2019-10-23 08:18:07 +00:00
loop :
2019-08-02 22:21:46 +00:00
for blockSet [ len ( blockSet ) - 1 ] . Height ( ) > untilHeight {
2019-12-16 19:22:56 +00:00
for _ , bc := range at . Cids ( ) {
2020-02-12 07:21:11 +00:00
if reason , ok := syncer . bad . Has ( bc ) ; ok {
2020-07-06 17:23:29 +00:00
newReason := reason . Linked ( "change contained %s" , bc )
2019-11-11 22:37:34 +00:00
for _ , b := range acceptedBlocks {
2020-07-06 17:23:29 +00:00
syncer . bad . Add ( b , newReason )
2019-11-11 22:37:34 +00:00
}
2020-07-08 20:02:28 +00:00
return nil , xerrors . Errorf ( "chain contained block marked previously as bad (%s, %s) (reason: %s)" , incoming . Cids ( ) , bc , reason )
2019-10-09 08:50:57 +00:00
}
}
2019-10-23 08:18:07 +00:00
// If, for some reason, we have a suffix of the chain locally, handle that here
2021-12-11 21:03:00 +00:00
ts , err := syncer . store . LoadTipSet ( ctx , at )
2019-10-23 08:18:07 +00:00
if err == nil {
2019-12-16 19:22:56 +00:00
acceptedBlocks = append ( acceptedBlocks , at . Cids ( ) ... )
2019-11-11 22:37:34 +00:00
2019-10-23 08:18:07 +00:00
blockSet = append ( blockSet , ts )
at = ts . Parents ( )
continue
}
2022-06-28 11:09:59 +00:00
if ! ipld . IsNotFound ( err ) {
2020-11-24 11:09:48 +00:00
log . Warnf ( "loading local tipset: %s" , err )
2019-07-31 07:13:49 +00:00
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
// NB: GetBlocks validates that the blocks are in-fact the ones we
2020-06-23 21:51:25 +00:00
// requested, and that they are correctly linked to one another. It does
// not validate any state transitions.
2019-09-26 07:22:45 +00:00
window := 500
2019-09-09 20:03:10 +00:00
if gap := int ( blockSet [ len ( blockSet ) - 1 ] . Height ( ) - untilHeight ) ; gap < window {
window = gap
}
2020-09-07 18:31:43 +00:00
blks , err := syncer . Exchange . GetBlocks ( ctx , at , window )
2019-07-31 07:13:49 +00:00
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
2019-07-26 18:16:57 +00:00
2019-10-03 18:20:29 +00:00
log . Errorf ( "failed to get blocks: %+v" , err )
2019-07-26 18:16:57 +00:00
2019-11-09 20:14:40 +00:00
span . AddAttributes ( trace . StringAttribute ( "error" , err . Error ( ) ) )
2019-07-31 07:13:49 +00:00
// This error will only be logged above,
return nil , xerrors . Errorf ( "failed to get blocks: %w" , err )
2019-07-30 13:55:36 +00:00
}
2019-09-26 07:22:45 +00:00
log . Info ( "Got blocks: " , blks [ 0 ] . Height ( ) , len ( blks ) )
2019-07-26 18:16:57 +00:00
2020-08-04 18:06:30 +00:00
// Check that the fetched segment of the chain matches what we already
// have. Since we fetch from the head backwards our reassembled chain
// is sorted in reverse here: we have a child -> parent order, our last
// tipset then should be child of the first tipset retrieved.
2020-09-07 18:31:43 +00:00
// FIXME: The reassembly logic should be part of the `client`
2020-08-04 18:06:30 +00:00
// service, the consumer should not be concerned with the
// `MaxRequestLength` limitation, it should just be able to request
// an segment of arbitrary length. The same burden is put on
// `syncFork()` which needs to be aware this as well.
if blockSet [ len ( blockSet ) - 1 ] . IsChildOf ( blks [ 0 ] ) == false {
return nil , xerrors . Errorf ( "retrieved segments of the chain are not connected at heights %d/%d" ,
blockSet [ len ( blockSet ) - 1 ] . Height ( ) , blks [ 0 ] . Height ( ) )
// A successful `GetBlocks()` call is guaranteed to fetch at least
2021-09-02 16:07:23 +00:00
// one tipset so the access `blks[0]` is safe.
2020-08-04 18:06:30 +00:00
}
2019-07-31 07:13:49 +00:00
for _ , b := range blks {
2019-08-02 22:21:46 +00:00
if b . Height ( ) < untilHeight {
2019-09-06 20:03:28 +00:00
break loop
2019-08-02 22:21:46 +00:00
}
2019-10-09 08:50:57 +00:00
for _ , bc := range b . Cids ( ) {
2020-02-12 07:21:11 +00:00
if reason , ok := syncer . bad . Has ( bc ) ; ok {
2020-07-06 17:23:29 +00:00
newReason := reason . Linked ( "change contained %s" , bc )
2019-11-11 22:37:34 +00:00
for _ , b := range acceptedBlocks {
2020-07-06 17:23:29 +00:00
syncer . bad . Add ( b , newReason )
2019-11-11 22:37:34 +00:00
}
2020-07-08 20:02:28 +00:00
return nil , xerrors . Errorf ( "chain contained block marked previously as bad (%s, %s) (reason: %s)" , incoming . Cids ( ) , bc , reason )
2019-10-09 08:50:57 +00:00
}
}
2019-07-31 07:13:49 +00:00
blockSet = append ( blockSet , b )
}
2019-07-26 18:16:57 +00:00
2019-12-16 19:22:56 +00:00
acceptedBlocks = append ( acceptedBlocks , at . Cids ( ) ... )
2019-11-11 22:37:34 +00:00
2019-11-15 21:35:29 +00:00
ss . SetHeight ( blks [ len ( blks ) - 1 ] . Height ( ) )
2019-07-31 07:13:49 +00:00
at = blks [ len ( blks ) - 1 ] . Parents ( )
}
2019-07-26 16:13:25 +00:00
2020-08-07 21:01:28 +00:00
base := blockSet [ len ( blockSet ) - 1 ]
2020-10-23 17:30:42 +00:00
if base . Equals ( known ) {
blockSet = blockSet [ : len ( blockSet ) - 1 ]
base = blockSet [ len ( blockSet ) - 1 ]
}
2020-10-08 20:49:36 +00:00
if base . IsChildOf ( known ) {
// common case: receiving blocks that are building on top of our best tipset
2020-08-07 21:01:28 +00:00
return blockSet , nil
}
2019-09-08 20:14:01 +00:00
2021-12-11 21:03:00 +00:00
knownParent , err := syncer . store . LoadTipSet ( ctx , known . Parents ( ) )
2020-10-08 20:49:36 +00:00
if err != nil {
return nil , xerrors . Errorf ( "failed to load next local tipset: %w" , err )
}
if base . IsChildOf ( knownParent ) {
// common case: receiving a block thats potentially part of the same tipset as our best block
2020-08-07 21:01:28 +00:00
return blockSet , nil
}
2020-06-23 21:51:25 +00:00
2020-08-07 21:01:28 +00:00
// We have now ascertained that this is *not* a 'fast forward'
log . Warnf ( "(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)" , incoming . Cids ( ) , incoming . Height ( ) , known . Cids ( ) , known . Height ( ) )
2021-04-28 19:49:21 +00:00
fork , err := syncer . syncFork ( ctx , base , known , ignoreCheckpoint )
2020-08-07 21:01:28 +00:00
if err != nil {
2020-09-09 05:14:01 +00:00
if xerrors . Is ( err , ErrForkTooLong ) || xerrors . Is ( err , ErrForkCheckpoint ) {
2020-08-07 21:01:28 +00:00
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log . Warn ( "adding forked chain to our bad tipset cache" )
for _ , b := range incoming . Blocks ( ) {
syncer . bad . Add ( b . Cid ( ) , NewBadBlockReason ( incoming . Cids ( ) , "fork past finality" ) )
2019-11-10 23:06:06 +00:00
}
2019-10-06 00:51:48 +00:00
}
2020-08-07 21:01:28 +00:00
return nil , xerrors . Errorf ( "failed to sync fork: %w" , err )
2019-07-31 07:13:49 +00:00
}
2019-07-26 18:16:57 +00:00
2020-08-07 21:01:28 +00:00
blockSet = append ( blockSet , fork ... )
2019-07-31 07:13:49 +00:00
return blockSet , nil
}
2019-07-26 18:16:57 +00:00
2019-11-10 23:06:06 +00:00
var ErrForkTooLong = fmt . Errorf ( "fork longer than threshold" )
2020-09-09 05:14:01 +00:00
var ErrForkCheckpoint = fmt . Errorf ( "fork would require us to diverge from checkpointed block" )
2019-11-10 23:06:06 +00:00
2020-06-23 21:51:25 +00:00
// syncFork tries to obtain the chain fragment that links a fork into a common
// ancestor in our view of the chain.
//
2020-09-09 05:14:01 +00:00
// If the fork is too long (build.ForkLengthThreshold), or would cause us to diverge from the checkpoint (ErrForkCheckpoint),
// we add the entire subchain to the denylist. Else, we find the common ancestor, and add the missing chain
2020-06-23 21:51:25 +00:00
// fragment until the fork point to the returned []TipSet.
2021-04-28 19:49:21 +00:00
func ( syncer * Syncer ) syncFork ( ctx context . Context , incoming * types . TipSet , known * types . TipSet , ignoreCheckpoint bool ) ( [ ] * types . TipSet , error ) {
2020-09-09 18:33:00 +00:00
2021-04-28 19:49:21 +00:00
var chkpt * types . TipSet
if ! ignoreCheckpoint {
chkpt = syncer . store . GetCheckpoint ( )
if known . Equals ( chkpt ) {
return nil , ErrForkCheckpoint
}
2020-09-09 18:33:00 +00:00
}
2020-12-22 15:29:36 +00:00
// TODO: Does this mean we always ask for ForkLengthThreshold blocks from the network, even if we just need, like, 2? Yes.
2020-09-09 05:14:01 +00:00
// Would it not be better to ask in smaller chunks, given that an ~ForkLengthThreshold is very rare?
2020-09-07 18:31:43 +00:00
tips , err := syncer . Exchange . GetBlocks ( ctx , incoming . Parents ( ) , int ( build . ForkLengthThreshold ) )
2019-10-06 00:51:48 +00:00
if err != nil {
return nil , err
}
2021-12-11 21:03:00 +00:00
nts , err := syncer . store . LoadTipSet ( ctx , known . Parents ( ) )
2019-10-06 00:51:48 +00:00
if err != nil {
return nil , xerrors . Errorf ( "failed to load next local tipset: %w" , err )
}
2020-12-22 15:29:36 +00:00
// Track the fork length on our side of the synced chain to enforce
// `ForkLengthThreshold`. Initialized to 1 because we already walked back
// one tipset from `known` (our synced head).
forkLengthInHead := 1
2019-10-06 00:51:48 +00:00
for cur := 0 ; cur < len ( tips ) ; {
2019-12-10 21:37:51 +00:00
if nts . Height ( ) == 0 {
if ! syncer . Genesis . Equals ( nts ) {
return nil , xerrors . Errorf ( "somehow synced chain that linked back to a different genesis (bad genesis: %s)" , nts . Key ( ) )
}
2020-08-07 18:36:24 +00:00
return nil , xerrors . Errorf ( "synced chain forked at genesis, refusing to sync; incoming: %s" , incoming . Cids ( ) )
2019-12-10 21:37:51 +00:00
}
2019-10-06 00:51:48 +00:00
if nts . Equals ( tips [ cur ] ) {
return tips [ : cur + 1 ] , nil
}
if nts . Height ( ) < tips [ cur ] . Height ( ) {
cur ++
} else {
2020-12-22 15:29:36 +00:00
// Walk back one block in our synced chain to try to meet the fork's
// height.
forkLengthInHead ++
if forkLengthInHead > int ( build . ForkLengthThreshold ) {
return nil , ErrForkTooLong
}
2020-09-09 18:33:00 +00:00
// We will be forking away from nts, check that it isn't checkpointed
2021-04-28 19:49:21 +00:00
if nts . Equals ( chkpt ) {
2020-09-09 18:33:00 +00:00
return nil , ErrForkCheckpoint
}
2021-12-11 21:03:00 +00:00
nts , err = syncer . store . LoadTipSet ( ctx , nts . Parents ( ) )
2019-10-06 00:51:48 +00:00
if err != nil {
return nil , xerrors . Errorf ( "loading next local tipset: %w" , err )
}
}
}
2020-09-09 05:14:01 +00:00
2019-11-10 23:06:06 +00:00
return nil , ErrForkTooLong
2019-10-06 00:51:48 +00:00
}
2019-10-10 11:13:26 +00:00
func ( syncer * Syncer ) syncMessagesAndCheckState ( ctx context . Context , headers [ ] * types . TipSet ) error {
2019-11-15 21:35:29 +00:00
ss := extractSyncState ( ctx )
2020-10-13 00:14:16 +00:00
ss . SetHeight ( headers [ len ( headers ) - 1 ] . Height ( ) )
2019-10-23 14:45:03 +00:00
2019-10-12 09:44:56 +00:00
return syncer . iterFullTipsets ( ctx , headers , func ( ctx context . Context , fts * store . FullTipSet ) error {
2019-10-03 18:20:29 +00:00
log . Debugw ( "validating tipset" , "height" , fts . TipSet ( ) . Height ( ) , "size" , len ( fts . TipSet ( ) . Cids ( ) ) )
2020-09-30 05:39:06 +00:00
if err := syncer . ValidateTipSet ( ctx , fts , true ) ; err != nil {
2019-10-03 18:20:29 +00:00
log . Errorf ( "failed to validate tipset: %+v" , err )
2019-08-02 00:13:57 +00:00
return xerrors . Errorf ( "message processing failed: %w" , err )
}
2019-09-06 20:03:28 +00:00
2020-03-02 00:26:09 +00:00
stats . Record ( ctx , metrics . ChainNodeWorkerHeight . M ( int64 ( fts . TipSet ( ) . Height ( ) ) ) )
2019-11-15 21:35:29 +00:00
ss . SetHeight ( fts . TipSet ( ) . Height ( ) )
2019-09-30 21:06:47 +00:00
2019-08-02 00:13:57 +00:00
return nil
} )
}
// fills out each of the given tipsets with messages and calls the callback with it
2019-10-12 09:44:56 +00:00
func ( syncer * Syncer ) iterFullTipsets ( ctx context . Context , headers [ ] * types . TipSet , cb func ( context . Context , * store . FullTipSet ) error ) error {
2020-09-08 18:42:20 +00:00
ss := extractSyncState ( ctx )
2019-10-12 09:44:56 +00:00
ctx , span := trace . StartSpan ( ctx , "iterFullTipsets" )
defer span . End ( )
2019-12-17 22:15:51 +00:00
span . AddAttributes ( trace . Int64Attribute ( "num_headers" , int64 ( len ( headers ) ) ) )
2019-10-23 08:18:07 +00:00
for i := len ( headers ) - 1 ; i >= 0 ; {
2021-12-17 09:42:09 +00:00
fts , err := syncer . store . TryFillTipSet ( ctx , headers [ i ] )
2019-08-02 00:13:57 +00:00
if err != nil {
return err
}
2019-10-23 08:18:07 +00:00
if fts != nil {
if err := cb ( ctx , fts ) ; err != nil {
return err
}
i --
continue
2019-08-02 00:13:57 +00:00
}
2019-07-26 18:16:57 +00:00
2020-09-16 18:04:44 +00:00
batchSize := concurrentSyncRequests * syncRequestBatchSize
2019-08-02 22:21:46 +00:00
if i < batchSize {
2020-09-21 06:21:25 +00:00
batchSize = i + 1
2019-07-30 13:55:36 +00:00
}
2019-07-26 16:13:25 +00:00
2020-09-08 18:42:20 +00:00
ss . SetStage ( api . StageFetchingMessages )
2020-09-17 14:21:26 +00:00
startOffset := i + 1 - batchSize
bstout , batchErr := syncer . fetchMessages ( ctx , headers [ startOffset : startOffset + batchSize ] , startOffset )
2020-09-08 18:42:20 +00:00
ss . SetStage ( api . StageMessages )
2020-04-01 18:35:09 +00:00
2020-09-16 18:04:44 +00:00
if batchErr != nil {
2020-10-09 20:59:56 +00:00
return xerrors . Errorf ( "failed to fetch messages: %w" , batchErr )
2019-07-31 07:13:49 +00:00
}
2019-07-30 13:55:36 +00:00
2020-04-01 18:35:09 +00:00
for bsi := 0 ; bsi < len ( bstout ) ; bsi ++ {
2019-09-06 20:03:28 +00:00
// temp storage so we don't persist data we dont want to
2021-01-29 20:01:00 +00:00
bs := bstore . NewMemory ( )
2020-02-05 02:26:42 +00:00
blks := cbor . NewCborStore ( bs )
2019-09-06 20:03:28 +00:00
2019-08-02 22:21:46 +00:00
this := headers [ i - bsi ]
2020-04-01 18:35:09 +00:00
bstip := bstout [ len ( bstout ) - ( bsi + 1 ) ]
2020-07-27 15:31:36 +00:00
fts , err := zipTipSetAndMessages ( blks , this , bstip . Bls , bstip . Secpk , bstip . BlsIncludes , bstip . SecpkIncludes )
2019-07-30 13:55:36 +00:00
if err != nil {
2019-10-03 18:20:29 +00:00
log . Warnw ( "zipping failed" , "error" , err , "bsi" , bsi , "i" , i ,
2020-07-27 15:31:36 +00:00
"height" , this . Height ( ) ,
2020-04-01 18:35:09 +00:00
"next-height" , i + batchSize )
2019-07-31 07:13:49 +00:00
return xerrors . Errorf ( "message processing failed: %w" , err )
2019-07-30 13:55:36 +00:00
}
2019-07-26 18:16:57 +00:00
2019-10-12 09:44:56 +00:00
if err := cb ( ctx , fts ) ; err != nil {
2019-08-02 00:13:57 +00:00
return err
2019-07-30 13:55:36 +00:00
}
2019-07-26 18:16:57 +00:00
2020-09-24 01:53:28 +00:00
if err := persistMessages ( ctx , bs , bstip ) ; err != nil {
2019-09-06 20:03:28 +00:00
return err
}
2019-07-26 16:13:25 +00:00
2021-02-28 22:48:36 +00:00
if err := copyBlockstore ( ctx , bs , syncer . store . ChainBlockstore ( ) ) ; err != nil {
2019-09-06 20:03:28 +00:00
return xerrors . Errorf ( "message processing failed: %w" , err )
}
2019-07-05 14:29:17 +00:00
}
2020-08-28 19:03:06 +00:00
2020-08-28 20:09:48 +00:00
i -= batchSize
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2019-07-31 07:13:49 +00:00
return nil
}
2019-07-05 14:29:17 +00:00
2021-09-02 16:07:23 +00:00
func checkMsgMeta ( ts * types . TipSet , allbmsgs [ ] * types . Message , allsmsgs [ ] * types . SignedMessage , bmi , smi [ ] [ ] uint64 ) error {
for bi , b := range ts . Blocks ( ) {
if msgc := len ( bmi [ bi ] ) + len ( smi [ bi ] ) ; msgc > build . BlockMessageLimit {
return fmt . Errorf ( "block %q has too many messages (%d)" , b . Cid ( ) , msgc )
}
var smsgCids [ ] cid . Cid
for _ , m := range smi [ bi ] {
smsgCids = append ( smsgCids , allsmsgs [ m ] . Cid ( ) )
}
var bmsgCids [ ] cid . Cid
for _ , m := range bmi [ bi ] {
bmsgCids = append ( bmsgCids , allbmsgs [ m ] . Cid ( ) )
}
mrcid , err := computeMsgMeta ( cbor . NewCborStore ( bstore . NewMemory ( ) ) , bmsgCids , smsgCids )
if err != nil {
return err
}
if b . Messages != mrcid {
return fmt . Errorf ( "messages didnt match message root in header for ts %s" , ts . Key ( ) )
}
}
return nil
}
2020-09-17 14:21:26 +00:00
func ( syncer * Syncer ) fetchMessages ( ctx context . Context , headers [ ] * types . TipSet , startOffset int ) ( [ ] * exchange . CompactedMessages , error ) {
batchSize := len ( headers )
batch := make ( [ ] * exchange . CompactedMessages , batchSize )
var wg sync . WaitGroup
var mx sync . Mutex
var batchErr error
start := build . Clock . Now ( )
for j := 0 ; j < batchSize ; j += syncRequestBatchSize {
wg . Add ( 1 )
go func ( j int ) {
defer wg . Done ( )
nreq := syncRequestBatchSize
if j + nreq > batchSize {
nreq = batchSize - j
2020-08-28 20:18:06 +00:00
}
2020-09-17 14:21:26 +00:00
failed := false
for offset := 0 ; ! failed && offset < nreq ; {
nextI := j + offset
2020-09-21 15:58:52 +00:00
lastI := j + nreq
2020-09-17 14:21:26 +00:00
var requestErr error
var requestResult [ ] * exchange . CompactedMessages
for retry := 0 ; requestResult == nil && retry < syncRequestRetries ; retry ++ {
if retry > 0 {
log . Infof ( "fetching messages at %d (retry %d)" , startOffset + nextI , retry )
} else {
log . Infof ( "fetching messages at %d" , startOffset + nextI )
}
2020-09-21 15:58:52 +00:00
result , err := syncer . Exchange . GetChainMessages ( ctx , headers [ nextI : lastI ] )
2020-09-17 14:21:26 +00:00
if err != nil {
requestErr = multierror . Append ( requestErr , err )
} else {
2021-09-02 16:07:23 +00:00
isGood := true
for index , ts := range headers [ nextI : lastI ] {
cm := result [ index ]
if err := checkMsgMeta ( ts , cm . Bls , cm . Secpk , cm . BlsIncludes , cm . SecpkIncludes ) ; err != nil {
log . Errorf ( "fetched messages not as expected: %s" , err )
isGood = false
break
}
}
if isGood {
requestResult = result
}
2020-09-17 14:21:26 +00:00
}
}
mx . Lock ( )
if requestResult != nil {
copy ( batch [ j + offset : ] , requestResult )
offset += len ( requestResult )
} else {
log . Errorf ( "error fetching messages at %d: %s" , nextI , requestErr )
batchErr = multierror . Append ( batchErr , requestErr )
failed = true
}
mx . Unlock ( )
2020-08-28 20:18:06 +00:00
}
2020-09-17 14:21:26 +00:00
} ( j )
}
wg . Wait ( )
2020-08-28 20:09:48 +00:00
2020-09-17 14:35:40 +00:00
if batchErr != nil {
return nil , batchErr
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2020-09-17 14:35:40 +00:00
log . Infof ( "fetching messages for %d tipsets at %d done; took %s" , batchSize , startOffset , build . Clock . Since ( start ) )
2020-08-28 19:52:40 +00:00
2020-09-17 15:23:50 +00:00
return batch , nil
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2020-09-24 01:53:28 +00:00
func persistMessages ( ctx context . Context , bs bstore . Blockstore , bst * exchange . CompactedMessages ) error {
_ , span := trace . StartSpan ( ctx , "persistMessages" )
defer span . End ( )
2020-07-27 15:31:36 +00:00
for _ , m := range bst . Bls {
2019-09-06 20:03:28 +00:00
//log.Infof("putting BLS message: %s", m.Cid())
2021-12-17 09:42:09 +00:00
if _ , err := store . PutMessage ( ctx , bs , m ) ; err != nil {
2019-10-03 18:20:29 +00:00
log . Errorf ( "failed to persist messages: %+v" , err )
2019-09-06 20:03:28 +00:00
return xerrors . Errorf ( "BLS message processing failed: %w" , err )
2019-08-01 20:40:47 +00:00
}
2019-09-06 20:03:28 +00:00
}
2020-07-27 15:31:36 +00:00
for _ , m := range bst . Secpk {
2022-10-13 15:03:18 +00:00
if m . Signature . Type != crypto . SigTypeSecp256k1 {
2020-02-12 23:52:36 +00:00
return xerrors . Errorf ( "unknown signature type on message %s: %q" , m . Cid ( ) , m . Signature . Type )
2019-09-06 20:03:28 +00:00
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
2021-12-17 09:42:09 +00:00
if _ , err := store . PutMessage ( ctx , bs , m ) ; err != nil {
2019-10-03 18:20:29 +00:00
log . Errorf ( "failed to persist messages: %+v" , err )
2019-09-06 20:03:28 +00:00
return xerrors . Errorf ( "secp256k1 message processing failed: %w" , err )
2019-08-02 00:13:57 +00:00
}
}
return nil
}
2020-06-23 21:51:25 +00:00
// collectChain tries to advance our view of the chain to the purported head.
//
// It goes through various stages:
//
// 1. StageHeaders: we proceed in the sync process by requesting block headers
// from our peers, moving back from their heads, until we reach a tipset
// that we have in common (such a common tipset must exist, thought it may
// simply be the genesis block).
//
// If the common tipset is our head, we treat the sync as a "fast-forward",
// else we must drop part of our chain to connect to the peer's head
// (referred to as "forking").
//
2022-08-29 14:25:30 +00:00
// 2. StagePersistHeaders: now that we've collected the missing headers,
2020-06-23 21:51:25 +00:00
// augmented by those on the other side of a fork, we persist them to the
// BlockStore.
//
// 3. StageMessages: having acquired the headers and found a common tipset,
// we then move forward, requesting the full blocks, including the messages.
2021-04-28 19:49:21 +00:00
func ( syncer * Syncer ) collectChain ( ctx context . Context , ts * types . TipSet , hts * types . TipSet , ignoreCheckpoint bool ) error {
2019-10-12 09:44:56 +00:00
ctx , span := trace . StartSpan ( ctx , "collectChain" )
defer span . End ( )
2019-11-15 21:35:29 +00:00
ss := extractSyncState ( ctx )
2019-10-12 09:44:56 +00:00
2020-11-02 21:56:26 +00:00
ss . Init ( hts , ts )
2019-09-30 21:06:47 +00:00
2021-04-28 19:49:21 +00:00
headers , err := syncer . collectHeaders ( ctx , ts , hts , ignoreCheckpoint )
2019-07-31 07:13:49 +00:00
if err != nil {
2019-12-04 03:56:29 +00:00
ss . Error ( err )
2019-07-31 07:13:49 +00:00
return err
}
2019-11-09 20:14:40 +00:00
span . AddAttributes ( trace . Int64Attribute ( "syncChainLength" , int64 ( len ( headers ) ) ) )
2019-10-10 03:04:10 +00:00
if ! headers [ 0 ] . Equals ( ts ) {
log . Errorf ( "collectChain headers[0] should be equal to sync target. Its not: %s != %s" , headers [ 0 ] . Cids ( ) , ts . Cids ( ) )
}
2019-11-15 21:35:29 +00:00
ss . SetStage ( api . StagePersistHeaders )
2019-09-30 21:06:47 +00:00
2020-05-12 17:58:12 +00:00
toPersist := make ( [ ] * types . BlockHeader , 0 , len ( headers ) * int ( build . BlocksPerEpoch ) )
2019-07-31 07:13:49 +00:00
for _ , ts := range headers {
2019-11-12 10:18:46 +00:00
toPersist = append ( toPersist , ts . Blocks ( ) ... )
}
2021-12-11 21:03:00 +00:00
if err := syncer . store . PersistBlockHeaders ( ctx , toPersist ... ) ; err != nil {
2019-12-04 03:56:29 +00:00
err = xerrors . Errorf ( "failed to persist synced blocks to the chainstore: %w" , err )
ss . Error ( err )
return err
2019-07-05 14:29:17 +00:00
}
2019-11-12 10:18:46 +00:00
toPersist = nil
2019-07-05 14:29:17 +00:00
2019-11-15 21:35:29 +00:00
ss . SetStage ( api . StageMessages )
2019-09-30 21:06:47 +00:00
2019-10-10 11:13:26 +00:00
if err := syncer . syncMessagesAndCheckState ( ctx , headers ) ; err != nil {
2019-12-04 03:56:29 +00:00
err = xerrors . Errorf ( "collectChain syncMessages: %w" , err )
ss . Error ( err )
return err
2019-07-31 07:13:49 +00:00
}
2019-11-15 21:35:29 +00:00
ss . SetStage ( api . StageSyncComplete )
2019-11-07 00:18:06 +00:00
log . Debugw ( "new tipset" , "height" , ts . Height ( ) , "tipset" , types . LogCids ( ts . Cids ( ) ) )
2019-09-30 21:06:47 +00:00
2019-07-31 07:13:49 +00:00
return nil
2019-07-05 14:36:08 +00:00
}
2019-09-06 06:26:02 +00:00
2020-10-10 15:31:04 +00:00
func ( syncer * Syncer ) State ( ) [ ] SyncerStateSnapshot {
2020-09-14 20:58:59 +00:00
return syncer . syncmgr . State ( )
2019-09-30 21:06:47 +00:00
}
2019-12-21 06:10:40 +00:00
2020-06-23 21:51:25 +00:00
// MarkBad manually adds a block to the "bad blocks" cache.
2019-12-21 06:10:40 +00:00
func ( syncer * Syncer ) MarkBad ( blk cid . Cid ) {
2020-07-06 17:23:29 +00:00
syncer . bad . Add ( blk , NewBadBlockReason ( [ ] cid . Cid { blk } , "manually marked bad" ) )
2019-12-21 06:10:40 +00:00
}
2020-02-12 07:44:55 +00:00
2020-09-09 07:24:09 +00:00
// UnmarkBad manually adds a block to the "bad blocks" cache.
func ( syncer * Syncer ) UnmarkBad ( blk cid . Cid ) {
syncer . bad . Remove ( blk )
}
2020-10-10 08:26:42 +00:00
func ( syncer * Syncer ) UnmarkAllBad ( ) {
syncer . bad . Purge ( )
}
2020-02-12 07:44:55 +00:00
func ( syncer * Syncer ) CheckBadBlockCache ( blk cid . Cid ) ( string , bool ) {
2020-07-06 17:23:29 +00:00
bbr , ok := syncer . bad . Has ( blk )
return bbr . String ( ) , ok
2019-12-21 06:10:40 +00:00
}