2019-07-05 14:29:17 +00:00
package chain
import (
"context"
"fmt"
"sync"
2019-08-30 02:59:54 +00:00
"time"
2019-07-05 14:29:17 +00:00
2019-08-30 02:59:54 +00:00
"github.com/filecoin-project/go-lotus/build"
2019-07-12 23:52:25 +00:00
"github.com/filecoin-project/go-lotus/chain/actors"
2019-08-16 00:17:09 +00:00
"github.com/filecoin-project/go-lotus/chain/address"
2019-09-06 06:26:02 +00:00
"github.com/filecoin-project/go-lotus/chain/stmgr"
2019-07-26 04:54:22 +00:00
"github.com/filecoin-project/go-lotus/chain/store"
2019-07-12 10:23:05 +00:00
"github.com/filecoin-project/go-lotus/chain/types"
2019-07-26 04:54:22 +00:00
"github.com/filecoin-project/go-lotus/chain/vm"
2019-08-16 00:17:09 +00:00
"github.com/filecoin-project/go-lotus/lib/vdf"
2019-07-08 12:51:45 +00:00
2019-09-17 01:56:37 +00:00
amt "github.com/filecoin-project/go-amt-ipld"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
dstore "github.com/ipfs/go-datastore"
bstore "github.com/ipfs/go-ipfs-blockstore"
2019-07-26 04:54:22 +00:00
logging "github.com/ipfs/go-log"
2019-07-30 16:39:07 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-07-05 14:29:17 +00:00
"github.com/pkg/errors"
2019-09-17 18:09:22 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
"golang.org/x/xerrors"
2019-07-05 14:29:17 +00:00
)
2019-07-26 04:54:22 +00:00
var log = logging . Logger ( "chain" )
2019-07-05 14:29:17 +00:00
type Syncer struct {
// The heaviest known tipset in the network.
// The interface for accessing and putting tipsets into local storage
2019-07-26 04:54:22 +00:00
store * store . ChainStore
2019-07-05 14:29:17 +00:00
2019-09-06 06:26:02 +00:00
// the state manager handles making state queries
sm * stmgr . StateManager
2019-07-05 14:46:21 +00:00
// The known Genesis tipset
2019-07-26 04:54:22 +00:00
Genesis * types . TipSet
2019-07-05 14:29:17 +00:00
syncLock sync . Mutex
// TipSets known to be invalid
bad BadTipSetCache
// handle to the block sync service
2019-07-08 14:07:09 +00:00
Bsync * BlockSync
2019-07-05 14:29:17 +00:00
2019-07-11 02:36:43 +00:00
self peer . ID
2019-07-05 14:29:17 +00:00
// peer heads
// Note: clear cache on disconnects
2019-07-26 04:54:22 +00:00
peerHeads map [ peer . ID ] * types . TipSet
2019-07-05 14:29:17 +00:00
peerHeadsLk sync . Mutex
}
2019-09-06 06:26:02 +00:00
func NewSyncer ( sm * stmgr . StateManager , bsync * BlockSync , self peer . ID ) ( * Syncer , error ) {
gen , err := sm . ChainStore ( ) . GetGenesis ( )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
2019-07-26 04:54:22 +00:00
gent , err := types . NewTipSet ( [ ] * types . BlockHeader { gen } )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
return & Syncer {
2019-07-05 14:46:21 +00:00
Genesis : gent ,
2019-07-08 14:07:09 +00:00
Bsync : bsync ,
2019-07-26 04:54:22 +00:00
peerHeads : make ( map [ peer . ID ] * types . TipSet ) ,
2019-09-06 06:26:02 +00:00
store : sm . ChainStore ( ) ,
sm : sm ,
2019-08-02 22:21:46 +00:00
self : self ,
2019-07-05 14:29:17 +00:00
} , nil
}
type BadTipSetCache struct {
badBlocks map [ cid . Cid ] struct { }
}
const BootstrapPeerThreshold = 1
// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
2019-07-26 04:54:22 +00:00
func ( syncer * Syncer ) InformNewHead ( from peer . ID , fts * store . FullTipSet ) {
2019-07-05 14:29:17 +00:00
if fts == nil {
panic ( "bad" )
}
2019-07-11 02:36:43 +00:00
if from == syncer . self {
// TODO: this is kindof a hack...
2019-08-27 18:45:21 +00:00
log . Debug ( "got block from ourselves" )
2019-07-11 02:36:43 +00:00
2019-07-30 16:39:07 +00:00
if err := syncer . Sync ( fts ) ; err != nil {
2019-07-11 02:36:43 +00:00
log . Errorf ( "failed to sync our own block: %s" , err )
}
return
}
2019-07-05 14:29:17 +00:00
syncer . peerHeadsLk . Lock ( )
syncer . peerHeads [ from ] = fts . TipSet ( )
syncer . peerHeadsLk . Unlock ( )
2019-07-08 14:07:09 +00:00
syncer . Bsync . AddPeer ( from )
2019-07-05 14:29:17 +00:00
go func ( ) {
2019-07-30 16:39:07 +00:00
if err := syncer . Sync ( fts ) ; err != nil {
2019-07-26 18:16:57 +00:00
log . Errorf ( "sync error: %s" , err )
2019-07-05 14:29:17 +00:00
}
} ( )
}
2019-07-25 22:15:03 +00:00
func ( syncer * Syncer ) InformNewBlock ( from peer . ID , blk * types . FullBlock ) {
2019-07-05 14:29:17 +00:00
// TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { Blocks : [ ] * types . FullBlock { blk } }
2019-07-05 14:29:17 +00:00
syncer . InformNewHead ( from , fts )
}
2019-07-26 04:54:22 +00:00
func reverse ( tips [ ] * types . TipSet ) [ ] * types . TipSet {
out := make ( [ ] * types . TipSet , len ( tips ) )
2019-07-05 14:29:17 +00:00
for i := 0 ; i < len ( tips ) ; i ++ {
out [ i ] = tips [ len ( tips ) - ( i + 1 ) ]
}
return out
}
func copyBlockstore ( from , to bstore . Blockstore ) error {
cids , err := from . AllKeysChan ( context . TODO ( ) )
if err != nil {
return err
}
for c := range cids {
b , err := from . Get ( c )
if err != nil {
return err
}
if err := to . Put ( b ) ; err != nil {
return err
}
}
return nil
}
2019-08-01 20:40:47 +00:00
// TODO: this function effectively accepts unchecked input from the network,
// either validate it here, or ensure that its validated elsewhere (maybe make
// sure the blocksync code checks it?)
// maybe this code should actually live in blocksync??
2019-09-17 01:56:37 +00:00
func zipTipSetAndMessages ( bs amt . Blocks , ts * types . TipSet , allbmsgs [ ] * types . Message , allsmsgs [ ] * types . SignedMessage , bmi , smi [ ] [ ] uint64 ) ( * store . FullTipSet , error ) {
2019-08-01 20:40:47 +00:00
if len ( ts . Blocks ( ) ) != len ( smi ) || len ( ts . Blocks ( ) ) != len ( bmi ) {
2019-07-05 14:29:17 +00:00
return nil , fmt . Errorf ( "msgincl length didnt match tipset size" )
}
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { }
2019-07-05 14:29:17 +00:00
for bi , b := range ts . Blocks ( ) {
2019-08-01 20:40:47 +00:00
var smsgs [ ] * types . SignedMessage
2019-09-17 01:56:37 +00:00
var smsgCids [ ] cbg . CBORMarshaler
2019-08-01 20:40:47 +00:00
for _ , m := range smi [ bi ] {
smsgs = append ( smsgs , allsmsgs [ m ] )
2019-09-17 01:56:37 +00:00
c := cbg . CborCid ( allsmsgs [ m ] . Cid ( ) )
smsgCids = append ( smsgCids , & c )
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
smroot , err := amt . FromArray ( bs , smsgCids )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
2019-08-01 20:40:47 +00:00
var bmsgs [ ] * types . Message
2019-09-17 01:56:37 +00:00
var bmsgCids [ ] cbg . CBORMarshaler
2019-08-01 20:40:47 +00:00
for _ , m := range bmi [ bi ] {
bmsgs = append ( bmsgs , allbmsgs [ m ] )
2019-09-17 01:56:37 +00:00
c := cbg . CborCid ( allbmsgs [ m ] . Cid ( ) )
bmsgCids = append ( bmsgCids , & c )
2019-08-01 20:40:47 +00:00
}
2019-09-17 01:56:37 +00:00
bmroot , err := amt . FromArray ( bs , bmsgCids )
2019-08-01 20:40:47 +00:00
if err != nil {
return nil , err
}
2019-09-17 01:56:37 +00:00
mrcid , err := bs . Put ( & types . MsgMeta {
2019-08-01 20:40:47 +00:00
BlsMessages : bmroot ,
SecpkMessages : smroot ,
} )
if err != nil {
return nil , err
}
if b . Messages != mrcid {
2019-07-05 14:29:17 +00:00
return nil , fmt . Errorf ( "messages didnt match message root in header" )
}
2019-07-25 22:15:03 +00:00
fb := & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : b ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-07-05 14:29:17 +00:00
}
fts . Blocks = append ( fts . Blocks , fb )
}
return fts , nil
}
2019-07-26 04:54:22 +00:00
func ( syncer * Syncer ) selectHead ( heads map [ peer . ID ] * types . TipSet ) ( * types . TipSet , error ) {
var headsArr [ ] * types . TipSet
2019-07-05 14:29:17 +00:00
for _ , ts := range heads {
headsArr = append ( headsArr , ts )
}
sel := headsArr [ 0 ]
for i := 1 ; i < len ( headsArr ) ; i ++ {
cur := headsArr [ i ]
yes , err := syncer . store . IsAncestorOf ( cur , sel )
if err != nil {
return nil , err
}
if yes {
continue
}
yes , err = syncer . store . IsAncestorOf ( sel , cur )
if err != nil {
return nil , err
}
if yes {
sel = cur
continue
}
nca , err := syncer . store . NearestCommonAncestor ( cur , sel )
if err != nil {
return nil , err
}
2019-09-03 17:45:55 +00:00
if sel . Height ( ) - nca . Height ( ) > build . ForkLengthThreshold {
2019-07-05 14:29:17 +00:00
// TODO: handle this better than refusing to sync
return nil , fmt . Errorf ( "Conflict exists in heads set" )
}
if syncer . store . Weight ( cur ) > syncer . store . Weight ( sel ) {
sel = cur
}
}
return sel , nil
}
2019-07-26 04:54:22 +00:00
func ( syncer * Syncer ) FetchTipSet ( ctx context . Context , p peer . ID , cids [ ] cid . Cid ) ( * store . FullTipSet , error ) {
2019-07-05 14:29:17 +00:00
if fts , err := syncer . tryLoadFullTipSet ( cids ) ; err == nil {
return fts , nil
}
2019-07-08 14:07:09 +00:00
return syncer . Bsync . GetFullTipSet ( ctx , p , cids )
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
func ( syncer * Syncer ) tryLoadFullTipSet ( cids [ ] cid . Cid ) ( * store . FullTipSet , error ) {
2019-07-05 14:29:17 +00:00
ts , err := syncer . store . LoadTipSet ( cids )
if err != nil {
return nil , err
}
2019-07-26 04:54:22 +00:00
fts := & store . FullTipSet { }
2019-07-05 14:29:17 +00:00
for _ , b := range ts . Blocks ( ) {
2019-08-01 20:40:47 +00:00
bmsgs , smsgs , err := syncer . store . MessagesForBlock ( b )
2019-07-05 14:29:17 +00:00
if err != nil {
return nil , err
}
2019-07-25 22:15:03 +00:00
fb := & types . FullBlock {
2019-08-01 20:40:47 +00:00
Header : b ,
BlsMessages : bmsgs ,
SecpkMessages : smsgs ,
2019-07-05 14:29:17 +00:00
}
fts . Blocks = append ( fts . Blocks , fb )
}
return fts , nil
}
2019-07-30 16:39:07 +00:00
func ( syncer * Syncer ) Sync ( maybeHead * store . FullTipSet ) error {
2019-07-26 18:16:57 +00:00
syncer . syncLock . Lock ( )
defer syncer . syncLock . Unlock ( )
2019-07-05 14:29:17 +00:00
ts := maybeHead . TipSet ( )
2019-08-02 22:21:46 +00:00
if syncer . Genesis . Equals ( ts ) || syncer . store . GetHeaviestTipSet ( ) . Equals ( ts ) {
2019-07-05 14:29:17 +00:00
return nil
}
2019-07-31 07:13:49 +00:00
if err := syncer . collectChain ( maybeHead ) ; err != nil {
2019-08-16 00:17:09 +00:00
return xerrors . Errorf ( "collectChain failed: %w" , err )
2019-07-05 14:29:17 +00:00
}
if err := syncer . store . PutTipSet ( maybeHead ) ; err != nil {
return errors . Wrap ( err , "failed to put synced tipset to chainstore" )
}
return nil
}
2019-07-26 19:01:02 +00:00
func ( syncer * Syncer ) ValidateTipSet ( ctx context . Context , fts * store . FullTipSet ) error {
2019-07-05 14:29:17 +00:00
ts := fts . TipSet ( )
2019-07-05 14:46:21 +00:00
if ts . Equals ( syncer . Genesis ) {
2019-07-05 14:29:17 +00:00
return nil
}
for _ , b := range fts . Blocks {
2019-07-26 19:01:02 +00:00
if err := syncer . ValidateBlock ( ctx , b ) ; err != nil {
2019-09-20 02:54:52 +00:00
return xerrors . Errorf ( "validating block %s: %w" , b . Cid ( ) , err )
2019-07-05 14:29:17 +00:00
}
}
return nil
}
2019-08-16 00:17:09 +00:00
func ( syncer * Syncer ) minerIsValid ( ctx context . Context , maddr address . Address , baseTs * types . TipSet ) error {
var err error
enc , err := actors . SerializeParams ( & actors . IsMinerParam { Addr : maddr } )
if err != nil {
return err
}
2019-09-09 20:03:10 +00:00
ret , err := syncer . sm . Call ( ctx , & types . Message {
2019-08-16 00:17:09 +00:00
To : actors . StorageMarketAddress ,
From : maddr ,
Method : actors . SMAMethods . IsMiner ,
Params : enc ,
} , baseTs )
if err != nil {
return xerrors . Errorf ( "checking if block miner is valid failed: %w" , err )
}
if ret . ExitCode != 0 {
return xerrors . Errorf ( "StorageMarket.IsMiner check failed (exit code %d)" , ret . ExitCode )
}
// TODO: ensure the miner is currently not late on their PoSt submission (this hasnt landed in the spec yet)
return nil
}
func ( syncer * Syncer ) validateTickets ( ctx context . Context , mworker address . Address , tickets [ ] * types . Ticket , base * types . TipSet ) error {
if len ( tickets ) == 0 {
return xerrors . Errorf ( "block had no tickets" )
}
cur := base . MinTicket ( )
for i := 0 ; i < len ( tickets ) ; i ++ {
next := tickets [ i ]
sig := & types . Signature {
Type : types . KTBLS ,
Data : next . VRFProof ,
}
if err := sig . Verify ( mworker , cur . VDFResult ) ; err != nil {
return xerrors . Errorf ( "invalid ticket, VRFProof invalid: %w" , err )
}
// now verify the VDF
if err := vdf . Verify ( next . VRFProof , next . VDFResult , next . VDFProof ) ; err != nil {
return xerrors . Errorf ( "ticket %d had invalid VDF: %w" , err )
}
cur = next
}
return nil
}
// Should match up with 'Semantical Validation' in validation.md in the spec
2019-07-26 19:01:02 +00:00
func ( syncer * Syncer ) ValidateBlock ( ctx context . Context , b * types . FullBlock ) error {
2019-07-05 14:29:17 +00:00
h := b . Header
2019-09-06 06:26:02 +00:00
stateroot , err := syncer . sm . TipSetState ( h . Parents )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-08-02 22:21:46 +00:00
return xerrors . Errorf ( "get tipsetstate(%d, %s) failed: %w" , h . Height , h . Parents , err )
2019-07-05 14:29:17 +00:00
}
2019-08-02 22:21:46 +00:00
2019-08-16 00:17:09 +00:00
baseTs , err := syncer . store . LoadTipSet ( h . Parents )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-08-21 17:40:28 +00:00
return xerrors . Errorf ( "load tipset failed: %w" , err )
2019-07-05 14:29:17 +00:00
}
2019-08-30 02:59:54 +00:00
if h . Timestamp > uint64 ( time . Now ( ) . Unix ( ) + build . AllowableClockDrift ) {
return xerrors . Errorf ( "block was from the future" )
}
if h . Timestamp < baseTs . MinTimestamp ( ) + uint64 ( build . BlockDelay * len ( h . Tickets ) ) {
log . Warn ( "timestamp funtimes: " , h . Timestamp , baseTs . MinTimestamp ( ) , len ( h . Tickets ) )
2019-09-06 17:44:09 +00:00
return xerrors . Errorf ( "block was generated too soon (h.ts:%d < base.mints:%d + BLOCK_DELAY:%d * tkts.len:%d)" , h . Timestamp , baseTs . MinTimestamp ( ) , build . BlockDelay , len ( h . Tickets ) )
2019-08-30 02:59:54 +00:00
}
2019-08-16 00:17:09 +00:00
if err := syncer . minerIsValid ( ctx , h . Miner , baseTs ) ; err != nil {
return xerrors . Errorf ( "minerIsValid failed: %w" , err )
}
2019-09-06 06:26:02 +00:00
waddr , err := stmgr . GetMinerWorker ( ctx , syncer . sm , stateroot , h . Miner )
2019-08-16 00:17:09 +00:00
if err != nil {
2019-09-06 06:26:02 +00:00
return xerrors . Errorf ( "GetMinerWorker failed: %w" , err )
2019-08-16 00:17:09 +00:00
}
2019-09-12 04:12:35 +00:00
if err := h . CheckBlockSignature ( waddr ) ; err != nil {
2019-09-11 20:10:29 +00:00
return xerrors . Errorf ( "check block signature failed: %w" , err )
}
2019-08-16 00:17:09 +00:00
if err := syncer . validateTickets ( ctx , waddr , h . Tickets , baseTs ) ; err != nil {
return xerrors . Errorf ( "validating block tickets failed: %w" , err )
}
2019-09-20 12:22:46 +00:00
rand , err := syncer . sm . ChainStore ( ) . GetRandomness ( ctx , baseTs . Cids ( ) , h . Tickets , build . RandomnessLookback )
2019-09-06 06:26:02 +00:00
if err != nil {
return xerrors . Errorf ( "failed to get randomness for verifying election proof: %w" , err )
}
if err := VerifyElectionProof ( ctx , h . ElectionProof , rand , waddr ) ; err != nil {
return xerrors . Errorf ( "checking eproof failed: %w" , err )
}
2019-09-09 20:03:10 +00:00
mpow , tpow , err := stmgr . GetPower ( ctx , syncer . sm , baseTs , h . Miner )
2019-09-06 06:26:02 +00:00
if err != nil {
return xerrors . Errorf ( "failed getting power: %w" , err )
}
if ! types . PowerCmp ( h . ElectionProof , mpow , tpow ) {
return xerrors . Errorf ( "miner created a block but was not a winner" )
}
2019-09-20 12:22:46 +00:00
r := vm . NewChainRand ( syncer . store , baseTs . Cids ( ) , baseTs . Height ( ) , h . Tickets )
2019-09-20 02:54:52 +00:00
vmi , err := vm . NewVM ( stateroot , h . Height , r , h . Miner , syncer . store )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-08-21 17:40:28 +00:00
return xerrors . Errorf ( "failed to instantiate VM: %w" , err )
2019-07-05 14:29:17 +00:00
}
2019-09-06 06:26:02 +00:00
owner , err := stmgr . GetMinerOwner ( ctx , syncer . sm , stateroot , b . Header . Miner )
2019-08-27 00:46:39 +00:00
if err != nil {
return xerrors . Errorf ( "getting miner owner for block miner failed: %w" , err )
}
if err := vmi . TransferFunds ( actors . NetworkAddress , owner , vm . MiningRewardForBlock ( baseTs ) ) ; err != nil {
2019-08-21 17:40:28 +00:00
return xerrors . Errorf ( "fund transfer failed: %w" , err )
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
var receipts [ ] cbg . CBORMarshaler
2019-08-01 20:40:47 +00:00
for i , m := range b . BlsMessages {
receipt , err := vmi . ApplyMessage ( ctx , m )
if err != nil {
return xerrors . Errorf ( "failed executing bls message %d in block %s: %w" , i , b . Header . Cid ( ) , err )
}
2019-09-17 01:56:37 +00:00
receipts = append ( receipts , & receipt . MessageReceipt )
2019-08-01 20:40:47 +00:00
}
for i , m := range b . SecpkMessages {
2019-07-25 22:15:33 +00:00
receipt , err := vmi . ApplyMessage ( ctx , & m . Message )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-08-01 20:40:47 +00:00
return xerrors . Errorf ( "failed executing secpk message %d in block %s: %w" , i , b . Header . Cid ( ) , err )
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
receipts = append ( receipts , & receipt . MessageReceipt )
2019-07-05 14:29:17 +00:00
}
2019-09-17 01:56:37 +00:00
bs := amt . WrapBlockstore ( syncer . store . Blockstore ( ) )
recptRoot , err := amt . FromArray ( bs , receipts )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-09-17 18:08:03 +00:00
return xerrors . Errorf ( "building receipts amt failed: %w" , err )
2019-07-05 14:29:17 +00:00
}
if recptRoot != b . Header . MessageReceipts {
return fmt . Errorf ( "receipts mismatched" )
}
2019-07-25 22:15:33 +00:00
final , err := vmi . Flush ( context . TODO ( ) )
2019-07-05 14:29:17 +00:00
if err != nil {
2019-08-21 17:40:28 +00:00
return xerrors . Errorf ( "failed to flush VM state: %w" , err )
2019-07-05 14:29:17 +00:00
}
if b . Header . StateRoot != final {
return fmt . Errorf ( "final state root does not match block" )
}
return nil
}
2019-08-02 22:21:46 +00:00
func ( syncer * Syncer ) collectHeaders ( from * types . TipSet , to * types . TipSet ) ( [ ] * types . TipSet , error ) {
2019-07-31 07:13:49 +00:00
blockSet := [ ] * types . TipSet { from }
2019-07-05 14:29:17 +00:00
2019-07-31 07:13:49 +00:00
at := from . Parents ( )
2019-07-05 14:29:17 +00:00
2019-08-02 22:21:46 +00:00
// we want to sync all the blocks until the height above the block we have
untilHeight := to . Height ( ) + 1
2019-07-31 07:13:49 +00:00
// If, for some reason, we have a suffix of the chain locally, handle that here
2019-08-02 22:21:46 +00:00
for blockSet [ len ( blockSet ) - 1 ] . Height ( ) > untilHeight {
2019-07-31 07:13:49 +00:00
log . Warn ( "syncing local: " , at )
ts , err := syncer . store . LoadTipSet ( at )
if err != nil {
if err == bstore . ErrNotFound {
log . Info ( "tipset not found locally, starting sync: " , at )
break
}
log . Warn ( "loading local tipset: %s" , err )
continue // TODO: verify
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
blockSet = append ( blockSet , ts )
at = ts . Parents ( )
}
2019-07-26 16:13:25 +00:00
2019-09-06 20:03:28 +00:00
loop :
2019-08-02 22:21:46 +00:00
for blockSet [ len ( blockSet ) - 1 ] . Height ( ) > untilHeight {
2019-07-31 07:13:49 +00:00
// NB: GetBlocks validates that the blocks are in-fact the ones we
// requested, and that they are correctly linked to eachother. It does
// not validate any state transitions
2019-09-09 20:03:10 +00:00
window := 10
if gap := int ( blockSet [ len ( blockSet ) - 1 ] . Height ( ) - untilHeight ) ; gap < window {
window = gap
}
blks , err := syncer . Bsync . GetBlocks ( context . TODO ( ) , at , window )
2019-07-31 07:13:49 +00:00
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
log . Error ( "failed to get blocks: " , err )
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
// This error will only be logged above,
return nil , xerrors . Errorf ( "failed to get blocks: %w" , err )
2019-07-30 13:55:36 +00:00
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
for _ , b := range blks {
2019-08-02 22:21:46 +00:00
if b . Height ( ) < untilHeight {
2019-09-06 20:03:28 +00:00
break loop
2019-08-02 22:21:46 +00:00
}
2019-07-31 07:13:49 +00:00
blockSet = append ( blockSet , b )
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
at = blks [ len ( blks ) - 1 ] . Parents ( )
}
2019-07-26 16:13:25 +00:00
2019-09-08 20:14:01 +00:00
// We have now ascertained that this is *not* a 'fast forward'
2019-09-03 04:36:07 +00:00
if ! types . CidArrsEqual ( blockSet [ len ( blockSet ) - 1 ] . Parents ( ) , to . Cids ( ) ) {
2019-09-08 20:14:01 +00:00
last := blockSet [ len ( blockSet ) - 1 ]
if types . CidArrsEqual ( last . Parents ( ) , to . Parents ( ) ) {
// common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet , nil
}
2019-08-02 22:21:46 +00:00
// TODO: handle the case where we are on a fork and its not a simple fast forward
2019-09-08 20:14:01 +00:00
// need to walk back to either a common ancestor, or until we hit the fork length threshold.
return nil , xerrors . Errorf ( "(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)" , from . Cids ( ) , from . Height ( ) , to . Cids ( ) , to . Height ( ) )
2019-07-31 07:13:49 +00:00
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
return blockSet , nil
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
func ( syncer * Syncer ) syncMessagesAndCheckState ( headers [ ] * types . TipSet ) error {
2019-08-02 00:13:57 +00:00
return syncer . iterFullTipsets ( headers , func ( fts * store . FullTipSet ) error {
2019-09-25 13:38:59 +00:00
log . Debugf ( "validating tipset (heigt=%d, size=%d)" , fts . TipSet ( ) . Height ( ) , len ( fts . TipSet ( ) . Cids ( ) ) )
2019-08-02 00:13:57 +00:00
if err := syncer . ValidateTipSet ( context . TODO ( ) , fts ) ; err != nil {
log . Errorf ( "failed to validate tipset: %s" , err )
return xerrors . Errorf ( "message processing failed: %w" , err )
}
2019-09-06 20:03:28 +00:00
2019-08-02 00:13:57 +00:00
return nil
} )
}
// fills out each of the given tipsets with messages and calls the callback with it
func ( syncer * Syncer ) iterFullTipsets ( headers [ ] * types . TipSet , cb func ( * store . FullTipSet ) error ) error {
2019-08-02 22:21:46 +00:00
beg := len ( headers ) - 1
2019-08-02 00:13:57 +00:00
// handle case where we have a prefix of these locally
2019-08-02 22:21:46 +00:00
for ; beg >= 0 ; beg -- {
2019-08-02 00:13:57 +00:00
fts , err := syncer . store . TryFillTipSet ( headers [ beg ] )
if err != nil {
return err
}
if fts == nil {
break
}
if err := cb ( fts ) ; err != nil {
return err
}
}
2019-08-02 22:21:46 +00:00
headers = headers [ : beg + 1 ]
2019-08-02 00:13:57 +00:00
windowSize := 10
2019-07-30 13:55:36 +00:00
2019-08-02 22:21:46 +00:00
for i := len ( headers ) - 1 ; i >= 0 ; i -= windowSize {
2019-07-26 18:16:57 +00:00
2019-08-02 00:13:57 +00:00
batchSize := windowSize
2019-08-02 22:21:46 +00:00
if i < batchSize {
batchSize = i
2019-07-30 13:55:36 +00:00
}
2019-07-26 16:13:25 +00:00
2019-08-02 22:21:46 +00:00
next := headers [ i - batchSize ]
2019-08-02 00:13:57 +00:00
bstips , err := syncer . Bsync . GetChainMessages ( context . TODO ( ) , next , uint64 ( batchSize + 1 ) )
2019-07-31 07:13:49 +00:00
if err != nil {
return xerrors . Errorf ( "message processing failed: %w" , err )
}
2019-07-30 13:55:36 +00:00
2019-07-31 07:13:49 +00:00
for bsi := 0 ; bsi < len ( bstips ) ; bsi ++ {
2019-09-06 20:03:28 +00:00
// temp storage so we don't persist data we dont want to
ds := dstore . NewMapDatastore ( )
bs := bstore . NewBlockstore ( ds )
2019-09-17 01:56:37 +00:00
blks := amt . WrapBlockstore ( bs )
2019-09-06 20:03:28 +00:00
2019-08-02 22:21:46 +00:00
this := headers [ i - bsi ]
2019-07-31 07:13:49 +00:00
bstip := bstips [ len ( bstips ) - ( bsi + 1 ) ]
2019-09-17 01:56:37 +00:00
fts , err := zipTipSetAndMessages ( blks , this , bstip . BlsMessages , bstip . SecpkMessages , bstip . BlsMsgIncludes , bstip . SecpkMsgIncludes )
2019-07-30 13:55:36 +00:00
if err != nil {
2019-08-02 22:21:46 +00:00
log . Warn ( "zipping failed: " , err , bsi , i )
log . Warn ( "height: " , this . Height ( ) )
log . Warn ( "bstip height: " , bstip . Blocks [ 0 ] . Height )
log . Warn ( "bstips: " , bstips )
log . Warn ( "next height: " , i + batchSize )
2019-07-31 07:13:49 +00:00
return xerrors . Errorf ( "message processing failed: %w" , err )
2019-07-30 13:55:36 +00:00
}
2019-07-26 18:16:57 +00:00
2019-08-02 00:13:57 +00:00
if err := cb ( fts ) ; err != nil {
return err
2019-07-30 13:55:36 +00:00
}
2019-07-26 18:16:57 +00:00
2019-09-06 20:03:28 +00:00
if err := persistMessages ( bs , bstip ) ; err != nil {
return err
}
2019-07-26 16:13:25 +00:00
2019-09-06 20:03:28 +00:00
if err := copyBlockstore ( bs , syncer . store . Blockstore ( ) ) ; err != nil {
return xerrors . Errorf ( "message processing failed: %w" , err )
}
2019-07-05 14:29:17 +00:00
}
2019-09-06 20:03:28 +00:00
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2019-07-31 07:13:49 +00:00
return nil
}
2019-07-05 14:29:17 +00:00
2019-09-06 20:03:28 +00:00
func persistMessages ( bs bstore . Blockstore , bst * BSTipSet ) error {
for _ , m := range bst . BlsMessages {
//log.Infof("putting BLS message: %s", m.Cid())
if _ , err := store . PutMessage ( bs , m ) ; err != nil {
log . Error ( "failed to persist messages: " , err )
return xerrors . Errorf ( "BLS message processing failed: %w" , err )
2019-08-01 20:40:47 +00:00
}
2019-09-06 20:03:28 +00:00
}
for _ , m := range bst . SecpkMessages {
if m . Signature . Type != types . KTSecp256k1 {
return xerrors . Errorf ( "unknown signature type on message %s: %q" , m . Cid ( ) , m . Signature . TypeCode )
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _ , err := store . PutMessage ( bs , m ) ; err != nil {
log . Error ( "failed to persist messages: " , err )
return xerrors . Errorf ( "secp256k1 message processing failed: %w" , err )
2019-08-02 00:13:57 +00:00
}
}
return nil
}
2019-07-31 07:13:49 +00:00
func ( syncer * Syncer ) collectChain ( fts * store . FullTipSet ) error {
2019-08-02 22:21:46 +00:00
headers , err := syncer . collectHeaders ( fts . TipSet ( ) , syncer . store . GetHeaviestTipSet ( ) )
2019-07-31 07:13:49 +00:00
if err != nil {
return err
}
for _ , ts := range headers {
for _ , b := range ts . Blocks ( ) {
if err := syncer . store . PersistBlockHeader ( b ) ; err != nil {
return xerrors . Errorf ( "failed to persist synced blocks to the chainstore: %w" , err )
}
2019-07-30 13:55:36 +00:00
}
2019-07-05 14:29:17 +00:00
}
2019-07-31 07:13:49 +00:00
if err := syncer . syncMessagesAndCheckState ( headers ) ; err != nil {
2019-08-16 00:17:09 +00:00
return xerrors . Errorf ( "collectChain syncMessages: %w" , err )
2019-07-31 07:13:49 +00:00
}
return nil
2019-07-05 14:36:08 +00:00
}
2019-09-06 06:26:02 +00:00
func VerifyElectionProof ( ctx context . Context , eproof [ ] byte , rand [ ] byte , worker address . Address ) error {
sig := types . Signature {
Data : eproof ,
Type : types . KTBLS ,
}
if err := sig . Verify ( worker , rand ) ; err != nil {
return xerrors . Errorf ( "failed to verify election proof signature: %w" , err )
}
return nil
}