lotus/chain/sync.go

1805 lines
55 KiB
Go
Raw Normal View History

2019-07-05 14:29:17 +00:00
package chain
import (
"bytes"
2019-07-05 14:29:17 +00:00
"context"
"errors"
2019-07-05 14:29:17 +00:00
"fmt"
2020-04-20 17:43:02 +00:00
"os"
"sort"
"strings"
"sync"
"time"
2019-07-05 14:29:17 +00:00
2020-09-29 04:24:38 +00:00
"github.com/filecoin-project/lotus/chain/actors/builtin"
"github.com/filecoin-project/lotus/node/modules/dtypes"
"github.com/Gurpartap/async"
2019-11-11 19:32:30 +00:00
"github.com/hashicorp/go-multierror"
blocks "github.com/ipfs/go-block-format"
2019-07-05 14:29:17 +00:00
"github.com/ipfs/go-cid"
2020-02-04 22:19:05 +00:00
cbor "github.com/ipfs/go-ipld-cbor"
logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p-core/connmgr"
2019-07-30 16:39:07 +00:00
"github.com/libp2p/go-libp2p-core/peer"
2019-09-17 18:09:22 +00:00
cbg "github.com/whyrusleeping/cbor-gen"
2019-11-18 21:39:07 +00:00
"github.com/whyrusleeping/pubsub"
"go.opencensus.io/stats"
2019-10-11 02:17:24 +00:00
"go.opencensus.io/trace"
2019-09-17 18:09:22 +00:00
"golang.org/x/xerrors"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/go-address"
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
"github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
ffi "github.com/filecoin-project/filecoin-ffi"
2020-01-07 16:23:12 +00:00
2020-10-08 01:09:33 +00:00
// named msgarray here to make it clear that these are the types used by
// messages, regardless of specs-actors version.
blockadt "github.com/filecoin-project/specs-actors/actors/util/adt"
proof2 "github.com/filecoin-project/specs-actors/v2/actors/runtime/proof"
2020-09-28 21:25:58 +00:00
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/api"
bstore "github.com/filecoin-project/lotus/blockstore"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/builtin/power"
"github.com/filecoin-project/lotus/chain/beacon"
"github.com/filecoin-project/lotus/chain/exchange"
"github.com/filecoin-project/lotus/chain/gen"
2019-11-11 19:32:30 +00:00
"github.com/filecoin-project/lotus/chain/state"
"github.com/filecoin-project/lotus/chain/stmgr"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
2020-05-12 11:48:09 +00:00
"github.com/filecoin-project/lotus/chain/vm"
"github.com/filecoin-project/lotus/lib/sigs"
"github.com/filecoin-project/lotus/metrics"
2019-07-05 14:29:17 +00:00
)
// Blocks that are more than MaxHeightDrift epochs above
// the theoretical max height based on systime are quickly rejected
const MaxHeightDrift = 5
2020-09-14 20:58:59 +00:00
var (
// LocalIncoming is the _local_ pubsub (unrelated to libp2p pubsub) topic
// where the Syncer publishes candidate chain heads to be synced.
LocalIncoming = "incoming"
2020-09-16 18:04:44 +00:00
log = logging.Logger("chain")
concurrentSyncRequests = exchange.ShufflePeersPrefix
2020-09-16 18:55:51 +00:00
syncRequestBatchSize = 8
2020-09-16 18:04:44 +00:00
syncRequestRetries = 5
)
2020-06-23 21:51:25 +00:00
// Syncer is in charge of running the chain synchronization logic. As such, it
// is tasked with these functions, amongst others:
//
// * Fast-forwards the chain as it learns of new TipSets from the network via
// the SyncManager.
// * Applies the fork choice rule to select the correct side when confronted
// with a fork in the network.
// * Requests block headers and messages from other peers when not available
// in our BlockStore.
// * Tracks blocks marked as bad in a cache.
// * Keeps the BlockStore and ChainStore consistent with our view of the world,
// the latter of which in turn informs other components when a reorg has been
// committed.
//
// The Syncer does not run workers itself. It's mainly concerned with
// ensuring a consistent state of chain consensus. The reactive and network-
// interfacing processes are part of other components, such as the SyncManager
// (which owns the sync scheduler and sync workers), ChainExchange, the HELLO
2020-06-23 21:51:25 +00:00
// protocol, and the gossipsub block propagation layer.
//
// {hint/concept} The fork-choice rule as it currently stands is: "pick the
// chain with the heaviest weight, so long as it hasnt deviated one finality
// threshold from our head (900 epochs, parameter determined by spec-actors)".
2019-07-05 14:29:17 +00:00
type Syncer struct {
// The interface for accessing and putting tipsets into local storage
2019-07-26 04:54:22 +00:00
store *store.ChainStore
2019-07-05 14:29:17 +00:00
// handle to the random beacon for verification
beacon beacon.Schedule
// the state manager handles making state queries
sm *stmgr.StateManager
2019-07-05 14:46:21 +00:00
// The known Genesis tipset
2019-07-26 04:54:22 +00:00
Genesis *types.TipSet
2019-07-05 14:29:17 +00:00
// TipSets known to be invalid
2019-10-09 08:50:57 +00:00
bad *BadBlockCache
2019-07-05 14:29:17 +00:00
// handle to the block sync service
Exchange exchange.Client
2019-07-05 14:29:17 +00:00
2019-07-11 02:36:43 +00:00
self peer.ID
2020-09-14 20:58:59 +00:00
syncmgr SyncManager
2019-11-18 21:39:07 +00:00
connmgr connmgr.ConnManager
2019-11-18 21:39:07 +00:00
incoming *pubsub.PubSub
receiptTracker *blockReceiptTracker
2020-04-17 14:47:19 +00:00
verifier ffiwrapper.Verifier
2020-08-28 19:52:40 +00:00
2020-09-06 04:32:05 +00:00
tickerCtxCancel context.CancelFunc
checkptLk sync.Mutex
checkpt types.TipSetKey
ds dtypes.MetadataDS
2019-07-05 14:29:17 +00:00
}
2020-09-14 20:58:59 +00:00
type SyncManagerCtor func(syncFn SyncFunc) SyncManager
2020-06-23 21:51:25 +00:00
// NewSyncer creates a new Syncer object.
2020-09-14 20:58:59 +00:00
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, syncMgrCtor SyncManagerCtor, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.Schedule, verifier ffiwrapper.Verifier) (*Syncer, error) {
gen, err := sm.ChainStore().GetGenesis()
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, xerrors.Errorf("getting genesis block: %w", err)
2019-07-05 14:29:17 +00:00
}
2019-07-26 04:54:22 +00:00
gent, err := types.NewTipSet([]*types.BlockHeader{gen})
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
cp, err := loadCheckpoint(ds)
if err != nil {
return nil, xerrors.Errorf("error loading mpool config: %w", err)
}
2019-11-15 21:35:29 +00:00
s := &Syncer{
ds: ds,
checkpt: cp,
beacon: beacon,
bad: NewBadBlockCache(),
Genesis: gent,
Exchange: exchange,
store: sm.ChainStore(),
sm: sm,
self: self,
receiptTracker: newBlockReceiptTracker(),
connmgr: connmgr,
2020-04-17 14:47:19 +00:00
verifier: verifier,
2019-11-18 21:39:07 +00:00
2019-11-20 19:44:38 +00:00
incoming: pubsub.New(50),
2019-11-15 21:35:29 +00:00
}
2020-08-07 13:53:55 +00:00
if build.InsecurePoStValidation {
log.Warn("*********************************************************************************************")
log.Warn(" [INSECURE-POST-VALIDATION] Insecure test validation is enabled. If you see this outside of a test, it is a severe bug! ")
log.Warn("*********************************************************************************************")
}
2020-09-14 20:58:59 +00:00
s.syncmgr = syncMgrCtor(s.Sync)
2019-11-15 21:35:29 +00:00
return s, nil
2019-07-05 14:29:17 +00:00
}
2019-11-15 21:35:29 +00:00
func (syncer *Syncer) Start() {
2020-09-06 04:32:05 +00:00
tickerCtx, tickerCtxCancel := context.WithCancel(context.Background())
2019-11-15 21:35:29 +00:00
syncer.syncmgr.Start()
2020-09-06 04:32:05 +00:00
syncer.tickerCtxCancel = tickerCtxCancel
go syncer.runMetricsTricker(tickerCtx)
}
func (syncer *Syncer) runMetricsTricker(tickerCtx context.Context) {
genesisTime := time.Unix(int64(syncer.Genesis.MinTimestamp()), 0)
ticker := build.Clock.Ticker(time.Duration(build.BlockDelaySecs) * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
sinceGenesis := build.Clock.Now().Sub(genesisTime)
expectedHeight := int64(sinceGenesis.Seconds()) / int64(build.BlockDelaySecs)
2020-09-07 16:21:45 +00:00
stats.Record(tickerCtx, metrics.ChainNodeHeightExpected.M(expectedHeight))
2020-09-06 04:32:05 +00:00
case <-tickerCtx.Done():
return
}
}
2019-11-15 21:35:29 +00:00
}
func (syncer *Syncer) Stop() {
syncer.syncmgr.Stop()
2020-09-06 04:32:05 +00:00
syncer.tickerCtxCancel()
2019-07-05 14:29:17 +00:00
}
// InformNewHead informs the syncer about a new potential tipset
// This should be called when connecting to new peers, and additionally
// when receiving new blocks from the network
func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
defer func() {
if err := recover(); err != nil {
log.Errorf("panic in InformNewHead: ", err)
}
}()
2019-10-10 11:13:26 +00:00
ctx := context.Background()
2019-07-05 14:29:17 +00:00
if fts == nil {
2019-11-16 23:41:14 +00:00
log.Errorf("got nil tipset in InformNewHead")
return false
2019-07-05 14:29:17 +00:00
}
if syncer.IsEpochBeyondCurrMax(fts.TipSet().Height()) {
log.Errorf("Received block with impossibly large height %d", fts.TipSet().Height())
return false
}
for _, b := range fts.Blocks {
if reason, ok := syncer.bad.Has(b.Cid()); ok {
log.Warnf("InformNewHead called on block marked as bad: %s (reason: %s)", b.Cid(), reason)
return false
}
2019-10-15 12:19:10 +00:00
if err := syncer.ValidateMsgMeta(b); err != nil {
log.Warnf("invalid block received: %s", err)
return false
}
}
2019-12-01 23:11:43 +00:00
syncer.incoming.Pub(fts.TipSet().Blocks(), LocalIncoming)
2019-11-18 21:39:07 +00:00
2019-11-15 03:19:16 +00:00
// TODO: IMPORTANT(GARBAGE) this needs to be put in the 'temporary' side of
// the blockstore
if err := syncer.store.PersistBlockHeaders(fts.TipSet().Blocks()...); err != nil {
log.Warn("failed to persist incoming block header: ", err)
return false
2019-11-15 03:19:16 +00:00
}
syncer.Exchange.AddPeer(from)
2019-07-05 14:29:17 +00:00
hts := syncer.store.GetHeaviestTipSet()
bestPweight := hts.ParentWeight()
targetWeight := fts.TipSet().ParentWeight()
2019-11-10 23:06:06 +00:00
if targetWeight.LessThan(bestPweight) {
2019-12-04 04:59:41 +00:00
var miners []string
for _, blk := range fts.TipSet().Blocks() {
miners = append(miners, blk.Miner.String())
}
2020-11-03 12:28:31 +00:00
log.Debugw("incoming tipset does not appear to be better than our best chain, ignoring for now", "miners", miners, "bestPweight", bestPweight, "bestTS", hts.Cids(), "incomingWeight", targetWeight, "incomingTS", fts.TipSet().Cids())
return false
}
2019-11-16 06:48:42 +00:00
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
return true
2019-07-05 14:29:17 +00:00
}
2020-06-23 21:51:25 +00:00
// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to
// receive new block headers as they arrive from the network, and sends them to
// the returned channel.
//
// These blocks have not necessarily been incorporated to our view of the chain.
2019-11-18 21:39:07 +00:00
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
2019-12-01 23:11:43 +00:00
sub := syncer.incoming.Sub(LocalIncoming)
2019-11-18 21:39:07 +00:00
out := make(chan *types.BlockHeader, 10)
go func() {
2019-12-01 23:11:43 +00:00
defer syncer.incoming.Unsub(sub, LocalIncoming)
2019-11-19 19:49:11 +00:00
2019-11-18 21:39:07 +00:00
for {
select {
case r := <-sub:
hs := r.([]*types.BlockHeader)
for _, h := range hs {
select {
case out <- h:
case <-ctx.Done():
return
}
}
case <-ctx.Done():
return
}
}
}()
return out, nil
}
2020-06-23 21:51:25 +00:00
// ValidateMsgMeta performs structural and content hash validation of the
// messages within this block. If validation passes, it stores the messages in
// the underlying IPLD block store.
2019-10-15 12:19:10 +00:00
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit {
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
}
2019-11-15 03:19:16 +00:00
// TODO: IMPORTANT(GARBAGE). These message puts and the msgmeta
// computation need to go into the 'temporary' side of the blockstore when
// we implement that
// We use a temporary bstore here to avoid writing intermediate pieces
// into the blockstore.
blockstore := bstore.NewMemory()
cst := cbor.NewCborStore(blockstore)
2020-06-23 21:51:25 +00:00
var bcids, scids []cid.Cid
2019-11-15 03:19:16 +00:00
for _, m := range fblk.BlsMessages {
c, err := store.PutMessage(blockstore, m)
2019-11-15 03:19:16 +00:00
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
bcids = append(bcids, c)
2019-11-15 03:19:16 +00:00
}
for _, m := range fblk.SecpkMessages {
c, err := store.PutMessage(blockstore, m)
2019-11-15 03:19:16 +00:00
if err != nil {
return xerrors.Errorf("putting bls message to blockstore after msgmeta computation: %w", err)
}
scids = append(scids, c)
2019-11-15 03:19:16 +00:00
}
// Compute the root CID of the combined message trie.
smroot, err := computeMsgMeta(cst, bcids, scids)
if err != nil {
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
}
// Check that the message trie root matches with what's in the block.
if fblk.Header.Messages != smroot {
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
}
// Finally, flush.
return vm.Copy(context.TODO(), blockstore, syncer.store.ChainBlockstore(), smroot)
}
func (syncer *Syncer) LocalPeer() peer.ID {
return syncer.self
}
func (syncer *Syncer) ChainStore() *store.ChainStore {
return syncer.store
}
func (syncer *Syncer) InformNewBlock(from peer.ID, blk *types.FullBlock) bool {
2019-07-05 14:29:17 +00:00
// TODO: search for other blocks that could form a tipset with this block
// and then send that tipset to InformNewHead
2019-07-26 04:54:22 +00:00
fts := &store.FullTipSet{Blocks: []*types.FullBlock{blk}}
return syncer.InformNewHead(from, fts)
2019-07-05 14:29:17 +00:00
}
func copyBlockstore(ctx context.Context, from, to bstore.Blockstore) error {
ctx, span := trace.StartSpan(ctx, "copyBlockstore")
defer span.End()
cids, err := from.AllKeysChan(ctx)
2019-07-05 14:29:17 +00:00
if err != nil {
return err
}
// TODO: should probably expose better methods on the blockstore for this operation
var blks []blocks.Block
2019-07-05 14:29:17 +00:00
for c := range cids {
b, err := from.Get(c)
if err != nil {
return err
}
blks = append(blks, b)
}
if err := to.PutMany(blks); err != nil {
return err
2019-07-05 14:29:17 +00:00
}
return nil
}
// TODO: this function effectively accepts unchecked input from the network,
// either validate it here, or ensure that its validated elsewhere (maybe make
// sure the blocksync code checks it?)
// maybe this code should actually live in blocksync??
func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types.Message, allsmsgs []*types.SignedMessage, bmi, smi [][]uint64) (*store.FullTipSet, error) {
if len(ts.Blocks()) != len(smi) || len(ts.Blocks()) != len(bmi) {
2019-07-05 14:29:17 +00:00
return nil, fmt.Errorf("msgincl length didnt match tipset size")
}
2019-07-26 04:54:22 +00:00
fts := &store.FullTipSet{}
2019-07-05 14:29:17 +00:00
for bi, b := range ts.Blocks() {
if msgc := len(bmi[bi]) + len(smi[bi]); msgc > build.BlockMessageLimit {
return nil, fmt.Errorf("block %q has too many messages (%d)", b.Cid(), msgc)
}
var smsgs []*types.SignedMessage
var smsgCids []cid.Cid
for _, m := range smi[bi] {
smsgs = append(smsgs, allsmsgs[m])
smsgCids = append(smsgCids, allsmsgs[m].Cid())
2019-07-05 14:29:17 +00:00
}
var bmsgs []*types.Message
var bmsgCids []cid.Cid
for _, m := range bmi[bi] {
bmsgs = append(bmsgs, allbmsgs[m])
bmsgCids = append(bmsgCids, allbmsgs[m].Cid())
}
mrcid, err := computeMsgMeta(bs, bmsgCids, smsgCids)
if err != nil {
return nil, err
}
if b.Messages != mrcid {
2020-04-01 18:35:09 +00:00
return nil, fmt.Errorf("messages didnt match message root in header for ts %s", ts.Key())
2019-07-05 14:29:17 +00:00
}
fb := &types.FullBlock{
Header: b,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
2019-07-05 14:29:17 +00:00
}
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}
2020-06-23 21:51:25 +00:00
// computeMsgMeta computes the root CID of the combined arrays of message CIDs
// of both types (BLS and Secpk).
func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cid.Cid) (cid.Cid, error) {
2020-09-28 21:25:58 +00:00
// block headers use adt0
2020-10-08 01:09:33 +00:00
store := blockadt.WrapStore(context.TODO(), bs)
bmArr := blockadt.MakeEmptyArray(store)
smArr := blockadt.MakeEmptyArray(store)
for i, m := range bmsgCids {
c := cbg.CborCid(m)
if err := bmArr.Set(uint64(i), &c); err != nil {
return cid.Undef, err
}
}
for i, m := range smsgCids {
c := cbg.CborCid(m)
if err := smArr.Set(uint64(i), &c); err != nil {
return cid.Undef, err
}
}
bmroot, err := bmArr.Root()
if err != nil {
return cid.Undef, err
}
smroot, err := smArr.Root()
if err != nil {
return cid.Undef, err
}
mrcid, err := store.Put(store.Context(), &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return cid.Undef, xerrors.Errorf("failed to put msgmeta: %w", err)
}
return mrcid, nil
}
2020-06-23 21:51:25 +00:00
// FetchTipSet tries to load the provided tipset from the store, and falls back
// to the network (client) by querying the supplied peer if not found
2020-06-23 21:51:25 +00:00
// locally.
//
// {hint/usage} This is used from the HELLO protocol, to fetch the greeting
// peer's heaviest tipset if we don't have it.
2019-12-16 19:22:56 +00:00
func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil {
2019-07-05 14:29:17 +00:00
return fts, nil
}
2020-06-23 21:51:25 +00:00
// fall back to the network.
return syncer.Exchange.GetFullTipSet(ctx, p, tsk)
2019-07-05 14:29:17 +00:00
}
2020-06-23 21:51:25 +00:00
// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full
// representation of it containing FullBlocks. If ALL blocks are not found
// locally, it errors entirely with blockstore.ErrNotFound.
2019-12-16 19:22:56 +00:00
func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) {
ts, err := syncer.store.LoadTipSet(tsk)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
2019-07-26 04:54:22 +00:00
fts := &store.FullTipSet{}
2019-07-05 14:29:17 +00:00
for _, b := range ts.Blocks() {
bmsgs, smsgs, err := syncer.store.MessagesForBlock(b)
2019-07-05 14:29:17 +00:00
if err != nil {
return nil, err
}
fb := &types.FullBlock{
Header: b,
BlsMessages: bmsgs,
SecpkMessages: smsgs,
2019-07-05 14:29:17 +00:00
}
fts.Blocks = append(fts.Blocks, fb)
}
return fts, nil
}
2020-06-23 21:51:25 +00:00
// Sync tries to advance our view of the chain to `maybeHead`. It does nothing
// if our current head is heavier than the requested tipset, or if we're already
// at the requested head, or if the head is the genesis.
//
// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the
// godocs on that method for a more detailed view.
2019-10-10 11:13:26 +00:00
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "chain.Sync")
defer span.End()
2019-11-16 01:05:16 +00:00
2019-11-10 23:06:06 +00:00
if span.IsRecordingEvents() {
span.AddAttributes(
trace.StringAttribute("tipset", fmt.Sprint(maybeHead.Cids())),
trace.Int64Attribute("height", int64(maybeHead.Height())),
)
}
hts := syncer.store.GetHeaviestTipSet()
if hts.ParentWeight().GreaterThan(maybeHead.ParentWeight()) {
2019-11-20 19:44:38 +00:00
return nil
}
if syncer.Genesis.Equals(maybeHead) || hts.Equals(maybeHead) {
2019-07-05 14:29:17 +00:00
return nil
}
if err := syncer.collectChain(ctx, maybeHead, hts); err != nil {
2019-11-10 23:06:06 +00:00
span.AddAttributes(trace.StringAttribute("col_error", err.Error()))
2019-12-04 23:20:02 +00:00
span.SetStatus(trace.Status{
Code: 13,
Message: err.Error(),
})
return xerrors.Errorf("collectChain failed: %w", err)
2019-07-05 14:29:17 +00:00
}
// At this point we have accepted and synced to the new `maybeHead`
// (`StageSyncComplete`).
2019-10-15 05:00:30 +00:00
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
2019-11-10 23:06:06 +00:00
span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
2019-12-04 23:20:02 +00:00
span.SetStatus(trace.Status{
Code: 13,
Message: err.Error(),
})
return xerrors.Errorf("failed to put synced tipset to chainstore: %w", err)
2019-07-05 14:29:17 +00:00
}
peers := syncer.receiptTracker.GetPeers(maybeHead)
if len(peers) > 0 {
syncer.connmgr.TagPeer(peers[0], "new-block", 40)
for _, p := range peers[1:] {
syncer.connmgr.TagPeer(p, "new-block", 25)
}
}
2019-07-05 14:29:17 +00:00
return nil
}
func isPermanent(err error) bool {
return !errors.Is(err, ErrTemporal)
}
func (syncer *Syncer) ValidateTipSet(ctx context.Context, fts *store.FullTipSet, useCache bool) error {
ctx, span := trace.StartSpan(ctx, "validateTipSet")
defer span.End()
span.AddAttributes(trace.Int64Attribute("height", int64(fts.TipSet().Height())))
2019-07-05 14:29:17 +00:00
ts := fts.TipSet()
2019-07-05 14:46:21 +00:00
if ts.Equals(syncer.Genesis) {
2019-07-05 14:29:17 +00:00
return nil
}
var futures []async.ErrorFuture
2019-07-05 14:29:17 +00:00
for _, b := range fts.Blocks {
b := b // rebind to a scoped variable
futures = append(futures, async.Err(func() error {
if err := syncer.ValidateBlock(ctx, b, useCache); err != nil {
if isPermanent(err) {
syncer.bad.Add(b.Cid(), NewBadBlockReason([]cid.Cid{b.Cid()}, err.Error()))
}
return xerrors.Errorf("validating block %s: %w", b.Cid(), err)
}
2019-10-10 03:04:10 +00:00
if err := syncer.sm.ChainStore().AddToTipSetTracker(b.Header); err != nil {
return xerrors.Errorf("failed to add validated header to tipset tracker: %w", err)
}
return nil
}))
}
for _, f := range futures {
if err := f.AwaitContext(ctx); err != nil {
return err
2019-10-10 03:04:10 +00:00
}
2019-07-05 14:29:17 +00:00
}
return nil
}
func (syncer *Syncer) minerIsValid(ctx context.Context, maddr address.Address, baseTs *types.TipSet) error {
2020-09-23 06:18:52 +00:00
act, err := syncer.sm.LoadActor(ctx, power.Address, baseTs)
if err != nil {
return xerrors.Errorf("failed to load power actor: %w", err)
}
powState, err := power.Load(syncer.store.ActorStore(ctx), act)
2020-04-13 21:05:34 +00:00
if err != nil {
return xerrors.Errorf("failed to load power actor state: %w", err)
2020-04-13 21:05:34 +00:00
}
_, exist, err := powState.MinerPower(maddr)
if err != nil {
return xerrors.Errorf("failed to look up miner's claim: %w", err)
}
2020-02-12 23:52:36 +00:00
if !exist {
2019-11-14 16:14:52 +00:00
return xerrors.New("miner isn't valid")
}
return nil
}
var ErrTemporal = errors.New("temporal error")
func blockSanityChecks(h *types.BlockHeader) error {
if h.ElectionProof == nil {
2020-05-08 18:06:47 +00:00
return xerrors.Errorf("block cannot have nil election proof")
}
if h.Ticket == nil {
return xerrors.Errorf("block cannot have nil ticket")
}
if h.BlockSig == nil {
return xerrors.Errorf("block had nil signature")
}
if h.BLSAggregate == nil {
return xerrors.Errorf("block had nil bls aggregate signature")
}
if h.Miner.Protocol() != address.ID {
return xerrors.Errorf("block had non-ID miner address")
}
return nil
}
// ValidateBlock should match up with 'Semantical Validation' in validation.md in the spec
func (syncer *Syncer) ValidateBlock(ctx context.Context, b *types.FullBlock, useCache bool) (err error) {
2020-06-14 09:49:20 +00:00
defer func() {
// b.Cid() could panic for empty blocks that are used in tests.
if rerr := recover(); rerr != nil {
err = xerrors.Errorf("validate block panic: %w", rerr)
return
}
}()
if useCache {
isValidated, err := syncer.store.IsBlockValidated(ctx, b.Cid())
if err != nil {
return xerrors.Errorf("check block validation cache %s: %w", b.Cid(), err)
}
2020-06-14 09:49:20 +00:00
if isValidated {
return nil
}
2020-06-14 09:49:20 +00:00
}
2020-07-10 14:43:14 +00:00
validationStart := build.Clock.Now()
defer func() {
stats.Record(ctx, metrics.BlockValidationDurationMilliseconds.M(metrics.SinceInMilliseconds(validationStart)))
log.Infow("block validation", "took", time.Since(validationStart), "height", b.Header.Height, "age", time.Since(time.Unix(int64(b.Header.Timestamp), 0)))
}()
ctx, span := trace.StartSpan(ctx, "validateBlock")
defer span.End()
if err := blockSanityChecks(b.Header); err != nil {
return xerrors.Errorf("incoming header failed basic sanity checks: %w", err)
}
2019-07-05 14:29:17 +00:00
h := b.Header
2019-10-02 20:03:27 +00:00
2019-12-16 19:22:56 +00:00
baseTs, err := syncer.store.LoadTipSet(types.NewTipSetKey(h.Parents...))
2019-10-02 20:03:27 +00:00
if err != nil {
return xerrors.Errorf("load parent tipset failed (%s): %w", h.Parents, err)
2019-10-02 20:03:27 +00:00
}
winPoStNv := syncer.sm.GetNtwkVersion(ctx, baseTs.Height())
lbts, lbst, err := stmgr.GetLookbackTipSetForRound(ctx, syncer.sm, baseTs, h.Height)
if err != nil {
return xerrors.Errorf("failed to get lookback tipset for block: %w", err)
}
prevBeacon, err := syncer.store.GetLatestBeaconEntry(baseTs)
2020-04-07 22:54:01 +00:00
if err != nil {
return xerrors.Errorf("failed to get latest beacon entry: %w", err)
}
2019-11-11 19:26:14 +00:00
// fast checks first
nulls := h.Height - (baseTs.Height() + 1)
if tgtTs := baseTs.MinTimestamp() + build.BlockDelaySecs*uint64(nulls+1); h.Timestamp != tgtTs {
return xerrors.Errorf("block has wrong timestamp: %d != %d", h.Timestamp, tgtTs)
}
2019-11-19 20:07:16 +00:00
2020-07-10 14:43:14 +00:00
now := uint64(build.Clock.Now().Unix())
if h.Timestamp > now+build.AllowableClockDriftSecs {
return xerrors.Errorf("block was from the future (now=%d, blk=%d): %w", now, h.Timestamp, ErrTemporal)
2019-11-11 19:26:14 +00:00
}
if h.Timestamp > now {
2020-07-10 14:43:14 +00:00
log.Warn("Got block from the future, but within threshold", h.Timestamp, build.Clock.Now().Unix())
2019-12-03 20:00:04 +00:00
}
2019-11-11 19:26:14 +00:00
msgsCheck := async.Err(func() error {
2021-04-09 06:58:22 +00:00
if b.Cid() == build.WhitelistedBlock {
return nil
}
2019-11-11 19:26:14 +00:00
if err := syncer.checkBlockMessages(ctx, b, baseTs); err != nil {
return xerrors.Errorf("block had invalid messages: %w", err)
}
return nil
})
minerCheck := async.Err(func() error {
if err := syncer.minerIsValid(ctx, h.Miner, baseTs); err != nil {
return xerrors.Errorf("minerIsValid failed: %w", err)
}
return nil
})
baseFeeCheck := async.Err(func() error {
baseFee, err := syncer.store.ComputeBaseFee(ctx, baseTs)
if err != nil {
return xerrors.Errorf("computing base fee: %w", err)
}
if types.BigCmp(baseFee, b.Header.ParentBaseFee) != 0 {
return xerrors.Errorf("base fee doesn't match: %s (header) != %s (computed)",
b.Header.ParentBaseFee, baseFee)
}
return nil
})
pweight, err := syncer.store.Weight(ctx, baseTs)
if err != nil {
return xerrors.Errorf("getting parent weight: %w", err)
}
if types.BigCmp(pweight, b.Header.ParentWeight) != 0 {
return xerrors.Errorf("parrent weight different: %s (header) != %s (computed)",
b.Header.ParentWeight, pweight)
}
2019-11-11 19:26:14 +00:00
stateRootCheck := async.Err(func() error {
stateroot, precp, err := syncer.sm.TipSetState(ctx, baseTs)
if err != nil {
return xerrors.Errorf("get tipsetstate(%d, %s) failed: %w", h.Height, h.Parents, err)
}
if stateroot != h.ParentStateRoot {
msgs, err := syncer.store.MessagesForTipset(baseTs)
if err != nil {
log.Error("failed to load messages for tipset during tipset state mismatch error: ", err)
} else {
log.Warn("Messages for tipset with mismatching state:")
for i, m := range msgs {
mm := m.VMMessage()
log.Warnf("Message[%d]: from=%s to=%s method=%d params=%x", i, mm.From, mm.To, mm.Method, mm.Params)
}
}
return xerrors.Errorf("parent state root did not match computed state (%s != %s)", stateroot, h.ParentStateRoot)
}
if precp != h.ParentMessageReceipts {
return xerrors.Errorf("parent receipts root did not match computed value (%s != %s)", precp, h.ParentMessageReceipts)
}
return nil
})
2019-09-11 20:10:29 +00:00
// Stuff that needs worker address
waddr, err := stmgr.GetMinerWorkerRaw(ctx, syncer.sm, lbst, h.Miner)
if err != nil {
return xerrors.Errorf("GetMinerWorkerRaw failed: %w", err)
}
winnerCheck := async.Err(func() error {
if h.ElectionProof.WinCount < 1 {
return xerrors.Errorf("block is not claiming to be a winner")
}
eligible, err := stmgr.MinerEligibleToMine(ctx, syncer.sm, h.Miner, baseTs, lbts)
if err != nil {
return xerrors.Errorf("determining if miner has min power failed: %w", err)
}
if !eligible {
return xerrors.New("block's miner is ineligible to mine")
}
rBeacon := *prevBeacon
if len(h.BeaconEntries) != 0 {
rBeacon = h.BeaconEntries[len(h.BeaconEntries)-1]
}
buf := new(bytes.Buffer)
if err := h.Miner.MarshalCBOR(buf); err != nil {
return xerrors.Errorf("failed to marshal miner address to cbor: %w", err)
}
vrfBase, err := store.DrawRandomness(rBeacon.Data, crypto.DomainSeparationTag_ElectionProofProduction, h.Height, buf.Bytes())
if err != nil {
return xerrors.Errorf("could not draw randomness: %w", err)
}
if err := VerifyElectionPoStVRF(ctx, waddr, vrfBase, h.ElectionProof.VRFProof); err != nil {
return xerrors.Errorf("validating block election proof failed: %w", err)
}
slashed, err := stmgr.GetMinerSlashed(ctx, syncer.sm, baseTs, h.Miner)
if err != nil {
return xerrors.Errorf("failed to check if block miner was slashed: %w", err)
}
if slashed {
return xerrors.Errorf("received block was from slashed or invalid miner")
}
mpow, tpow, _, err := stmgr.GetPowerRaw(ctx, syncer.sm, lbst, h.Miner)
if err != nil {
return xerrors.Errorf("failed getting power: %w", err)
}
j := h.ElectionProof.ComputeWinCount(mpow.QualityAdjPower, tpow.QualityAdjPower)
if h.ElectionProof.WinCount != j {
return xerrors.Errorf("miner claims wrong number of wins: miner: %d, computed: %d", h.ElectionProof.WinCount, j)
}
return nil
})
2019-11-11 19:26:14 +00:00
blockSigCheck := async.Err(func() error {
if err := sigs.CheckBlockSignature(ctx, h, waddr); err != nil {
2019-11-11 19:26:14 +00:00
return xerrors.Errorf("check block signature failed: %w", err)
}
return nil
})
beaconValuesCheck := async.Err(func() error {
2020-04-20 17:43:02 +00:00
if os.Getenv("LOTUS_IGNORE_DRAND") == "_yes_" {
return nil
}
if err := beacon.ValidateBlockValues(syncer.beacon, h, baseTs.Height(), *prevBeacon); err != nil {
return xerrors.Errorf("failed to validate blocks random beacon values: %w", err)
}
return nil
})
2019-11-11 19:26:14 +00:00
tktsCheck := async.Err(func() error {
2020-04-08 15:11:42 +00:00
buf := new(bytes.Buffer)
if err := h.Miner.MarshalCBOR(buf); err != nil {
return xerrors.Errorf("failed to marshal miner address to cbor: %w", err)
}
if h.Height > build.UpgradeSmokeHeight {
2020-04-29 22:25:48 +00:00
buf.Write(baseTs.MinTicket().VRFProof)
}
2020-04-29 22:25:48 +00:00
beaconBase := *prevBeacon
if len(h.BeaconEntries) != 0 {
2020-04-29 22:25:48 +00:00
beaconBase = h.BeaconEntries[len(h.BeaconEntries)-1]
}
vrfBase, err := store.DrawRandomness(beaconBase.Data, crypto.DomainSeparationTag_TicketProduction, h.Height-build.TicketRandomnessLookback, buf.Bytes())
2020-04-08 15:11:42 +00:00
if err != nil {
return xerrors.Errorf("failed to compute vrf base for ticket: %w", err)
}
err = VerifyElectionPoStVRF(ctx, waddr, vrfBase, h.Ticket.VRFProof)
2019-11-26 22:53:52 +00:00
if err != nil {
2019-11-11 19:26:14 +00:00
return xerrors.Errorf("validating block tickets failed: %w", err)
}
return nil
})
wproofCheck := async.Err(func() error {
if err := syncer.VerifyWinningPoStProof(ctx, winPoStNv, h, *prevBeacon, lbst, waddr); err != nil {
return xerrors.Errorf("invalid election post: %w", err)
}
return nil
})
await := []async.ErrorFuture{
minerCheck,
tktsCheck,
blockSigCheck,
beaconValuesCheck,
wproofCheck,
winnerCheck,
msgsCheck,
baseFeeCheck,
stateRootCheck,
}
2019-11-11 19:30:49 +00:00
var merr error
for _, fut := range await {
if err := fut.AwaitContext(ctx); err != nil {
2019-11-14 16:14:52 +00:00
merr = multierror.Append(merr, err)
}
}
if merr != nil {
mulErr := merr.(*multierror.Error)
mulErr.ErrorFormat = func(es []error) string {
if len(es) == 1 {
return fmt.Sprintf("1 error occurred:\n\t* %+v\n\n", es[0])
}
points := make([]string, len(es))
for i, err := range es {
points[i] = fmt.Sprintf("* %+v", err)
}
return fmt.Sprintf(
"%d errors occurred:\n\t%s\n\n",
len(es), strings.Join(points, "\n\t"))
}
2020-07-22 18:50:42 +00:00
return mulErr
}
if useCache {
if err := syncer.store.MarkBlockAsValidated(ctx, b.Cid()); err != nil {
return xerrors.Errorf("caching block validation %s: %w", b.Cid(), err)
}
2020-06-14 09:49:20 +00:00
}
return nil
}
func (syncer *Syncer) VerifyWinningPoStProof(ctx context.Context, nv network.Version, h *types.BlockHeader, prevBeacon types.BeaconEntry, lbst cid.Cid, waddr address.Address) error {
if build.InsecurePoStValidation {
if len(h.WinPoStProof) == 0 {
2020-07-14 22:22:25 +00:00
return xerrors.Errorf("[INSECURE-POST-VALIDATION] No winning post proof given")
}
if string(h.WinPoStProof[0].ProofBytes) == "valid proof" {
return nil
}
2020-07-14 22:22:25 +00:00
return xerrors.Errorf("[INSECURE-POST-VALIDATION] winning post was invalid")
}
2020-04-30 20:21:46 +00:00
buf := new(bytes.Buffer)
if err := h.Miner.MarshalCBOR(buf); err != nil {
return xerrors.Errorf("failed to marshal miner address: %w", err)
}
2020-04-30 18:27:22 +00:00
rbase := prevBeacon
if len(h.BeaconEntries) > 0 {
rbase = h.BeaconEntries[len(h.BeaconEntries)-1]
}
rand, err := store.DrawRandomness(rbase.Data, crypto.DomainSeparationTag_WinningPoStChallengeSeed, h.Height, buf.Bytes())
2019-11-21 22:21:45 +00:00
if err != nil {
return xerrors.Errorf("failed to get randomness for verifying winning post proof: %w", err)
2019-11-21 22:21:45 +00:00
}
2020-02-27 21:45:31 +00:00
mid, err := address.IDFromAddress(h.Miner)
if err != nil {
return xerrors.Errorf("failed to get ID from miner address %s: %w", h.Miner, err)
}
sectors, err := stmgr.GetSectorsForWinningPoSt(ctx, nv, syncer.verifier, syncer.sm, lbst, h.Miner, rand)
2019-11-21 22:21:45 +00:00
if err != nil {
2020-04-17 14:47:19 +00:00
return xerrors.Errorf("getting winning post sector set: %w", err)
2019-11-21 22:21:45 +00:00
}
2020-10-08 01:09:33 +00:00
ok, err := ffiwrapper.ProofVerifier.VerifyWinningPoSt(ctx, proof2.WinningPoStVerifyInfo{
Randomness: rand,
Proofs: h.WinPoStProof,
2020-04-17 14:47:19 +00:00
ChallengedSectors: sectors,
Prover: abi.ActorID(mid),
})
2019-11-21 22:21:45 +00:00
if err != nil {
return xerrors.Errorf("failed to verify election post: %w", err)
}
if !ok {
2020-07-22 18:50:42 +00:00
log.Errorf("invalid winning post (block: %s, %x; %v)", h.Cid(), rand, sectors)
2020-04-17 14:47:19 +00:00
return xerrors.Errorf("winning post was invalid")
2019-11-21 22:21:45 +00:00
}
return nil
}
2020-05-11 20:19:35 +00:00
// TODO: We should extract this somewhere else and make the message pool and miner use the same logic
func (syncer *Syncer) checkBlockMessages(ctx context.Context, b *types.FullBlock, baseTs *types.TipSet) error {
2019-12-17 11:20:09 +00:00
{
var sigCids []cid.Cid // this is what we get for people not wanting the marshalcbor method on the cid type
var pubks [][]byte
2019-12-17 11:20:09 +00:00
for _, m := range b.BlsMessages {
sigCids = append(sigCids, m.Cid())
pubk, err := syncer.sm.GetBlsPublicKey(ctx, m.From, baseTs)
if err != nil {
return xerrors.Errorf("failed to load bls public to validate block: %w", err)
}
pubks = append(pubks, pubk)
}
if err := syncer.verifyBlsAggregate(ctx, b.Header.BLSAggregate, sigCids, pubks); err != nil {
return xerrors.Errorf("bls aggregate signature was invalid: %w", err)
}
}
2019-09-27 23:55:15 +00:00
nonces := make(map[address.Address]uint64)
2019-07-05 14:29:17 +00:00
stateroot, _, err := syncer.sm.TipSetState(ctx, baseTs)
if err != nil {
return err
}
st, err := state.LoadStateTree(syncer.store.ActorStore(ctx), stateroot)
if err != nil {
2019-09-27 23:55:15 +00:00
return xerrors.Errorf("failed to load base state tree: %w", err)
2019-07-05 14:29:17 +00:00
}
pl := vm.PricelistByEpoch(baseTs.Height())
var sumGasLimit int64
checkMsg := func(msg types.ChainMsg) error {
m := msg.VMMessage()
2020-05-11 20:19:35 +00:00
// Phase 1: syntactic validation, as defined in the spec
minGas := pl.OnChainMessage(msg.ChainLength())
if err := m.ValidForBlockInclusion(minGas.Total(), syncer.sm.GetNtwkVersion(ctx, b.Header.Height)); err != nil {
return err
2020-05-11 20:19:35 +00:00
}
// ValidForBlockInclusion checks if any single message does not exceed BlockGasLimit
// So below is overflow safe
sumGasLimit += m.GasLimit
if sumGasLimit > build.BlockGasLimit {
return xerrors.Errorf("block gas limit exceeded")
}
2020-05-11 20:19:35 +00:00
// Phase 2: (Partial) semantic validation:
// the sender exists and is an account actor, and the nonces make sense
2019-09-27 23:55:15 +00:00
if _, ok := nonces[m.From]; !ok {
// `GetActor` does not validate that this is an account actor.
2019-09-27 23:55:15 +00:00
act, err := st.GetActor(m.From)
if err != nil {
return xerrors.Errorf("failed to get actor: %w", err)
}
2020-05-11 20:19:35 +00:00
2020-09-29 04:24:38 +00:00
if !builtin.IsAccountActor(act.Code) {
2020-05-11 20:19:35 +00:00
return xerrors.New("Sender must be an account actor")
}
2019-09-27 23:55:15 +00:00
nonces[m.From] = act.Nonce
}
2019-09-27 23:55:15 +00:00
if nonces[m.From] != m.Nonce {
return xerrors.Errorf("wrong nonce (exp: %d, got: %d)", nonces[m.From], m.Nonce)
2019-07-05 14:29:17 +00:00
}
2019-09-27 23:55:15 +00:00
nonces[m.From]++
2019-07-05 14:29:17 +00:00
2019-09-27 23:55:15 +00:00
return nil
2019-07-05 14:29:17 +00:00
}
// Validate message arrays in a temporary blockstore.
tmpbs := bstore.NewMemory()
2020-10-08 01:09:33 +00:00
tmpstore := blockadt.WrapStore(ctx, cbor.NewCborStore(tmpbs))
2020-10-08 01:09:33 +00:00
bmArr := blockadt.MakeEmptyArray(tmpstore)
2019-09-27 23:55:15 +00:00
for i, m := range b.BlsMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid bls message at index %d: %w", i, err)
2019-09-27 23:55:15 +00:00
}
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
k := cbg.CborCid(c)
if err := bmArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put bls message at index %d: %w", i, err)
}
2019-07-05 14:29:17 +00:00
}
2020-10-08 01:09:33 +00:00
smArr := blockadt.MakeEmptyArray(tmpstore)
2019-09-27 23:55:15 +00:00
for i, m := range b.SecpkMessages {
if err := checkMsg(m); err != nil {
return xerrors.Errorf("block had invalid secpk message at index %d: %w", i, err)
2019-09-27 23:55:15 +00:00
}
// `From` being an account actor is only validated inside the `vm.ResolveToKeyAddr` call
// in `StateManager.ResolveToKeyAddress` here (and not in `checkMsg`).
kaddr, err := syncer.sm.ResolveToKeyAddress(ctx, m.Message.From, baseTs)
if err != nil {
return xerrors.Errorf("failed to resolve key addr: %w", err)
}
if err := sigs.Verify(&m.Signature, kaddr, m.Message.Cid().Bytes()); err != nil {
return xerrors.Errorf("secpk message %s has invalid signature: %w", m.Cid(), err)
}
c, err := store.PutMessage(tmpbs, m)
if err != nil {
return xerrors.Errorf("failed to store message %s: %w", m.Cid(), err)
}
k := cbg.CborCid(c)
if err := smArr.Set(uint64(i), &k); err != nil {
return xerrors.Errorf("failed to put secpk message at index %d: %w", i, err)
}
}
bmroot, err := bmArr.Root()
if err != nil {
return err
}
smroot, err := smArr.Root()
if err != nil {
return err
}
mrcid, err := tmpstore.Put(ctx, &types.MsgMeta{
BlsMessages: bmroot,
SecpkMessages: smroot,
})
if err != nil {
return err
}
if b.Header.Messages != mrcid {
return fmt.Errorf("messages didnt match message root in header")
2019-07-05 14:29:17 +00:00
}
// Finally, flush.
2021-03-02 21:29:24 +00:00
return vm.Copy(ctx, tmpbs, syncer.store.ChainBlockstore(), mrcid)
2019-07-05 14:29:17 +00:00
}
func (syncer *Syncer) verifyBlsAggregate(ctx context.Context, sig *crypto.Signature, msgs []cid.Cid, pubks [][]byte) error {
2019-12-05 01:18:30 +00:00
_, span := trace.StartSpan(ctx, "syncer.verifyBlsAggregate")
2019-10-13 09:08:34 +00:00
defer span.End()
span.AddAttributes(
trace.Int64Attribute("msgCount", int64(len(msgs))),
)
msgsS := make([]ffi.Message, len(msgs))
pubksS := make([]ffi.PublicKey, len(msgs))
for i := 0; i < len(msgs); i++ {
msgsS[i] = msgs[i].Bytes()
copy(pubksS[i][:], pubks[i][:ffi.PublicKeyBytes])
}
sigS := new(ffi.Signature)
copy(sigS[:], sig.Data[:ffi.SignatureBytes])
if len(msgs) == 0 {
return nil
}
valid := ffi.HashVerify(sigS, msgsS, pubksS)
if !valid {
return xerrors.New("bls aggregate signature failed to verify")
}
return nil
}
type syncStateKey struct{}
2019-11-15 21:35:29 +00:00
func extractSyncState(ctx context.Context) *SyncerState {
v := ctx.Value(syncStateKey{})
2019-11-15 21:35:29 +00:00
if v != nil {
return v.(*SyncerState)
}
return nil
}
2020-06-23 21:51:25 +00:00
// collectHeaders collects the headers from the blocks between any two tipsets.
//
2020-07-08 20:02:28 +00:00
// `incoming` is the heaviest/projected/target tipset we have learned about, and
// `known` is usually an anchor tipset we already have in our view of the chain
2020-06-23 21:51:25 +00:00
// (which could be the genesis).
//
// collectHeaders checks if portions of the chain are in our ChainStore; falling
// down to the network to retrieve the missing parts. If during the process, any
// portion we receive is in our denylist (bad list), we short-circuit.
//
// {hint/usage}: This is used by collectChain, which is in turn called from the
// main Sync method (Syncer#Sync), so it's a pretty central method.
//
// {hint/logic}: The logic of this method is as follows:
//
// 1. Check that the from tipset is not linked to a parent block known to be
// bad.
// 2. Check the consistency of beacon entries in the from tipset. We check
// total equality of the BeaconEntries in each block.
2020-07-08 20:02:28 +00:00
// 3. Traverse the chain backwards, for each tipset:
2020-06-23 21:51:25 +00:00
// 3a. Load it from the chainstore; if found, it move on to its parent.
// 3b. Query our peers via client in batches, requesting up to a
2020-06-23 21:51:25 +00:00
// maximum of 500 tipsets every time.
//
// Once we've concluded, if we find a mismatching tipset at the height where the
// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork
// to resolve it. Refer to the godocs there.
//
// All throughout the process, we keep checking if the received blocks are in
// the deny list, and short-circuit the process if so.
2020-07-08 20:02:28 +00:00
func (syncer *Syncer) collectHeaders(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
ctx, span := trace.StartSpan(ctx, "collectHeaders")
defer span.End()
2019-11-15 21:35:29 +00:00
ss := extractSyncState(ctx)
span.AddAttributes(
2020-07-10 19:31:58 +00:00
trace.Int64Attribute("incomingHeight", int64(incoming.Height())),
trace.Int64Attribute("knownHeight", int64(known.Height())),
)
2020-06-23 21:51:25 +00:00
// Check if the parents of the from block are in the denylist.
// i.e. if a fork of the chain has been requested that we know to be bad.
2020-07-08 20:02:28 +00:00
for _, pcid := range incoming.Parents().Cids() {
if reason, ok := syncer.bad.Has(pcid); ok {
newReason := reason.Linked("linked to %s", pcid)
2020-07-08 20:02:28 +00:00
for _, b := range incoming.Cids() {
syncer.bad.Add(b, newReason)
}
2020-07-08 20:02:28 +00:00
return nil, xerrors.Errorf("chain linked to block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), pcid, reason)
2019-11-10 23:06:06 +00:00
}
}
{
// ensure consistency of beacon entires
2020-07-08 20:02:28 +00:00
targetBE := incoming.Blocks()[0].BeaconEntries
sorted := sort.SliceIsSorted(targetBE, func(i, j int) bool {
return targetBE[i].Round < targetBE[j].Round
})
if !sorted {
2020-07-08 20:02:28 +00:00
syncer.bad.Add(incoming.Cids()[0], NewBadBlockReason(incoming.Cids(), "wrong order of beacon entires"))
return nil, xerrors.Errorf("wrong order of beacon entires")
}
2020-07-08 20:02:28 +00:00
for _, bh := range incoming.Blocks()[1:] {
if len(targetBE) != len(bh.BeaconEntries) {
// cannot mark bad, I think @Kubuxu
return nil, xerrors.Errorf("tipset contained different number for beacon entires")
}
for i, be := range bh.BeaconEntries {
if targetBE[i].Round != be.Round || !bytes.Equal(targetBE[i].Data, be.Data) {
// cannot mark bad, I think @Kubuxu
return nil, xerrors.Errorf("tipset contained different beacon entires")
}
}
}
}
2020-07-08 20:02:28 +00:00
blockSet := []*types.TipSet{incoming}
2019-07-05 14:29:17 +00:00
// Parent of the new (possibly better) tipset that we need to fetch next.
2020-07-08 20:02:28 +00:00
at := incoming.Parents()
2019-07-05 14:29:17 +00:00
// we want to sync all the blocks until the height above our
// best tipset so far
2020-07-08 20:02:28 +00:00
untilHeight := known.Height() + 1
2019-11-15 21:35:29 +00:00
ss.SetHeight(blockSet[len(blockSet)-1].Height())
var acceptedBlocks []cid.Cid
loop:
for blockSet[len(blockSet)-1].Height() > untilHeight {
2019-12-16 19:22:56 +00:00
for _, bc := range at.Cids() {
if reason, ok := syncer.bad.Has(bc); ok {
newReason := reason.Linked("change contained %s", bc)
for _, b := range acceptedBlocks {
syncer.bad.Add(b, newReason)
}
2020-07-08 20:02:28 +00:00
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
2019-10-09 08:50:57 +00:00
}
}
// If, for some reason, we have a suffix of the chain locally, handle that here
2019-07-31 07:13:49 +00:00
ts, err := syncer.store.LoadTipSet(at)
if err == nil {
2019-12-16 19:22:56 +00:00
acceptedBlocks = append(acceptedBlocks, at.Cids()...)
blockSet = append(blockSet, ts)
at = ts.Parents()
continue
}
if !xerrors.Is(err, bstore.ErrNotFound) {
2020-11-24 11:09:48 +00:00
log.Warnf("loading local tipset: %s", err)
2019-07-31 07:13:49 +00:00
}
2019-07-26 18:16:57 +00:00
2019-07-31 07:13:49 +00:00
// NB: GetBlocks validates that the blocks are in-fact the ones we
2020-06-23 21:51:25 +00:00
// requested, and that they are correctly linked to one another. It does
// not validate any state transitions.
window := 500
2019-09-09 20:03:10 +00:00
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
window = gap
}
blks, err := syncer.Exchange.GetBlocks(ctx, at, window)
2019-07-31 07:13:49 +00:00
if err != nil {
// Most likely our peers aren't fully synced yet, but forwarded
// new block message (ideally we'd find better peers)
2019-07-26 18:16:57 +00:00
log.Errorf("failed to get blocks: %+v", err)
2019-07-26 18:16:57 +00:00
span.AddAttributes(trace.StringAttribute("error", err.Error()))
2019-07-31 07:13:49 +00:00
// This error will only be logged above,
return nil, xerrors.Errorf("failed to get blocks: %w", err)
}
log.Info("Got blocks: ", blks[0].Height(), len(blks))
2019-07-26 18:16:57 +00:00
// Check that the fetched segment of the chain matches what we already
// have. Since we fetch from the head backwards our reassembled chain
// is sorted in reverse here: we have a child -> parent order, our last
// tipset then should be child of the first tipset retrieved.
// FIXME: The reassembly logic should be part of the `client`
// service, the consumer should not be concerned with the
// `MaxRequestLength` limitation, it should just be able to request
// an segment of arbitrary length. The same burden is put on
// `syncFork()` which needs to be aware this as well.
if blockSet[len(blockSet)-1].IsChildOf(blks[0]) == false {
return nil, xerrors.Errorf("retrieved segments of the chain are not connected at heights %d/%d",
blockSet[len(blockSet)-1].Height(), blks[0].Height())
// A successful `GetBlocks()` call is guaranteed to fetch at least
// one tipset so the acess `blks[0]` is safe.
}
2019-07-31 07:13:49 +00:00
for _, b := range blks {
if b.Height() < untilHeight {
2019-09-06 20:03:28 +00:00
break loop
}
2019-10-09 08:50:57 +00:00
for _, bc := range b.Cids() {
if reason, ok := syncer.bad.Has(bc); ok {
newReason := reason.Linked("change contained %s", bc)
for _, b := range acceptedBlocks {
syncer.bad.Add(b, newReason)
}
2020-07-08 20:02:28 +00:00
return nil, xerrors.Errorf("chain contained block marked previously as bad (%s, %s) (reason: %s)", incoming.Cids(), bc, reason)
2019-10-09 08:50:57 +00:00
}
}
2019-07-31 07:13:49 +00:00
blockSet = append(blockSet, b)
}
2019-07-26 18:16:57 +00:00
2019-12-16 19:22:56 +00:00
acceptedBlocks = append(acceptedBlocks, at.Cids()...)
2019-11-15 21:35:29 +00:00
ss.SetHeight(blks[len(blks)-1].Height())
2019-07-31 07:13:49 +00:00
at = blks[len(blks)-1].Parents()
}
base := blockSet[len(blockSet)-1]
if base.Equals(known) {
blockSet = blockSet[:len(blockSet)-1]
base = blockSet[len(blockSet)-1]
}
if base.IsChildOf(known) {
// common case: receiving blocks that are building on top of our best tipset
return blockSet, nil
}
knownParent, err := syncer.store.LoadTipSet(known.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
if base.IsChildOf(knownParent) {
// common case: receiving a block thats potentially part of the same tipset as our best block
return blockSet, nil
}
2020-06-23 21:51:25 +00:00
// We have now ascertained that this is *not* a 'fast forward'
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
fork, err := syncer.syncFork(ctx, base, known)
if err != nil {
if xerrors.Is(err, ErrForkTooLong) || xerrors.Is(err, ErrForkCheckpoint) {
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
log.Warn("adding forked chain to our bad tipset cache")
for _, b := range incoming.Blocks() {
syncer.bad.Add(b.Cid(), NewBadBlockReason(incoming.Cids(), "fork past finality"))
2019-11-10 23:06:06 +00:00
}
}
return nil, xerrors.Errorf("failed to sync fork: %w", err)
2019-07-31 07:13:49 +00:00
}
2019-07-26 18:16:57 +00:00
blockSet = append(blockSet, fork...)
2019-07-31 07:13:49 +00:00
return blockSet, nil
}
2019-07-26 18:16:57 +00:00
2019-11-10 23:06:06 +00:00
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
var ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block")
2019-11-10 23:06:06 +00:00
2020-06-23 21:51:25 +00:00
// syncFork tries to obtain the chain fragment that links a fork into a common
// ancestor in our view of the chain.
//
// If the fork is too long (build.ForkLengthThreshold), or would cause us to diverge from the checkpoint (ErrForkCheckpoint),
// we add the entire subchain to the denylist. Else, we find the common ancestor, and add the missing chain
2020-06-23 21:51:25 +00:00
// fragment until the fork point to the returned []TipSet.
2020-07-08 20:02:28 +00:00
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
chkpt := syncer.GetCheckpoint()
if known.Key() == chkpt {
return nil, ErrForkCheckpoint
}
// TODO: Does this mean we always ask for ForkLengthThreshold blocks from the network, even if we just need, like, 2? Yes.
// Would it not be better to ask in smaller chunks, given that an ~ForkLengthThreshold is very rare?
tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
if err != nil {
return nil, err
}
2020-07-08 20:02:28 +00:00
nts, err := syncer.store.LoadTipSet(known.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load next local tipset: %w", err)
}
// Track the fork length on our side of the synced chain to enforce
// `ForkLengthThreshold`. Initialized to 1 because we already walked back
// one tipset from `known` (our synced head).
forkLengthInHead := 1
for cur := 0; cur < len(tips); {
if nts.Height() == 0 {
if !syncer.Genesis.Equals(nts) {
return nil, xerrors.Errorf("somehow synced chain that linked back to a different genesis (bad genesis: %s)", nts.Key())
}
2020-08-07 18:36:24 +00:00
return nil, xerrors.Errorf("synced chain forked at genesis, refusing to sync; incoming: %s", incoming.Cids())
}
if nts.Equals(tips[cur]) {
return tips[:cur+1], nil
}
if nts.Height() < tips[cur].Height() {
cur++
} else {
// Walk back one block in our synced chain to try to meet the fork's
// height.
forkLengthInHead++
if forkLengthInHead > int(build.ForkLengthThreshold) {
return nil, ErrForkTooLong
}
// We will be forking away from nts, check that it isn't checkpointed
if nts.Key() == chkpt {
return nil, ErrForkCheckpoint
}
nts, err = syncer.store.LoadTipSet(nts.Parents())
if err != nil {
return nil, xerrors.Errorf("loading next local tipset: %w", err)
}
}
}
2019-11-10 23:06:06 +00:00
return nil, ErrForkTooLong
}
2019-10-10 11:13:26 +00:00
func (syncer *Syncer) syncMessagesAndCheckState(ctx context.Context, headers []*types.TipSet) error {
2019-11-15 21:35:29 +00:00
ss := extractSyncState(ctx)
ss.SetHeight(headers[len(headers)-1].Height())
return syncer.iterFullTipsets(ctx, headers, func(ctx context.Context, fts *store.FullTipSet) error {
log.Debugw("validating tipset", "height", fts.TipSet().Height(), "size", len(fts.TipSet().Cids()))
if err := syncer.ValidateTipSet(ctx, fts, true); err != nil {
log.Errorf("failed to validate tipset: %+v", err)
return xerrors.Errorf("message processing failed: %w", err)
}
2019-09-06 20:03:28 +00:00
stats.Record(ctx, metrics.ChainNodeWorkerHeight.M(int64(fts.TipSet().Height())))
2019-11-15 21:35:29 +00:00
ss.SetHeight(fts.TipSet().Height())
return nil
})
}
// fills out each of the given tipsets with messages and calls the callback with it
func (syncer *Syncer) iterFullTipsets(ctx context.Context, headers []*types.TipSet, cb func(context.Context, *store.FullTipSet) error) error {
ss := extractSyncState(ctx)
ctx, span := trace.StartSpan(ctx, "iterFullTipsets")
defer span.End()
span.AddAttributes(trace.Int64Attribute("num_headers", int64(len(headers))))
for i := len(headers) - 1; i >= 0; {
fts, err := syncer.store.TryFillTipSet(headers[i])
if err != nil {
return err
}
if fts != nil {
if err := cb(ctx, fts); err != nil {
return err
}
i--
continue
}
2019-07-26 18:16:57 +00:00
2020-09-16 18:04:44 +00:00
batchSize := concurrentSyncRequests * syncRequestBatchSize
if i < batchSize {
2020-09-21 06:21:25 +00:00
batchSize = i + 1
}
ss.SetStage(api.StageFetchingMessages)
startOffset := i + 1 - batchSize
bstout, batchErr := syncer.fetchMessages(ctx, headers[startOffset:startOffset+batchSize], startOffset)
ss.SetStage(api.StageMessages)
2020-04-01 18:35:09 +00:00
2020-09-16 18:04:44 +00:00
if batchErr != nil {
return xerrors.Errorf("failed to fetch messages: %w", batchErr)
2019-07-31 07:13:49 +00:00
}
2020-04-01 18:35:09 +00:00
for bsi := 0; bsi < len(bstout); bsi++ {
2019-09-06 20:03:28 +00:00
// temp storage so we don't persist data we dont want to
bs := bstore.NewMemory()
blks := cbor.NewCborStore(bs)
2019-09-06 20:03:28 +00:00
this := headers[i-bsi]
2020-04-01 18:35:09 +00:00
bstip := bstout[len(bstout)-(bsi+1)]
2020-07-27 15:31:36 +00:00
fts, err := zipTipSetAndMessages(blks, this, bstip.Bls, bstip.Secpk, bstip.BlsIncludes, bstip.SecpkIncludes)
if err != nil {
log.Warnw("zipping failed", "error", err, "bsi", bsi, "i", i,
2020-07-27 15:31:36 +00:00
"height", this.Height(),
2020-04-01 18:35:09 +00:00
"next-height", i+batchSize)
2019-07-31 07:13:49 +00:00
return xerrors.Errorf("message processing failed: %w", err)
}
2019-07-26 18:16:57 +00:00
if err := cb(ctx, fts); err != nil {
return err
}
2019-07-26 18:16:57 +00:00
if err := persistMessages(ctx, bs, bstip); err != nil {
2019-09-06 20:03:28 +00:00
return err
}
if err := copyBlockstore(ctx, bs, syncer.store.ChainBlockstore()); err != nil {
2019-09-06 20:03:28 +00:00
return xerrors.Errorf("message processing failed: %w", err)
}
2019-07-05 14:29:17 +00:00
}
i -= batchSize
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2019-07-31 07:13:49 +00:00
return nil
}
2019-07-05 14:29:17 +00:00
func (syncer *Syncer) fetchMessages(ctx context.Context, headers []*types.TipSet, startOffset int) ([]*exchange.CompactedMessages, error) {
batchSize := len(headers)
batch := make([]*exchange.CompactedMessages, batchSize)
var wg sync.WaitGroup
var mx sync.Mutex
var batchErr error
start := build.Clock.Now()
for j := 0; j < batchSize; j += syncRequestBatchSize {
wg.Add(1)
go func(j int) {
defer wg.Done()
nreq := syncRequestBatchSize
if j+nreq > batchSize {
nreq = batchSize - j
}
failed := false
for offset := 0; !failed && offset < nreq; {
nextI := j + offset
lastI := j + nreq
var requestErr error
var requestResult []*exchange.CompactedMessages
for retry := 0; requestResult == nil && retry < syncRequestRetries; retry++ {
if retry > 0 {
log.Infof("fetching messages at %d (retry %d)", startOffset+nextI, retry)
} else {
log.Infof("fetching messages at %d", startOffset+nextI)
}
result, err := syncer.Exchange.GetChainMessages(ctx, headers[nextI:lastI])
if err != nil {
requestErr = multierror.Append(requestErr, err)
} else {
requestResult = result
}
}
mx.Lock()
if requestResult != nil {
copy(batch[j+offset:], requestResult)
offset += len(requestResult)
} else {
log.Errorf("error fetching messages at %d: %s", nextI, requestErr)
batchErr = multierror.Append(batchErr, requestErr)
failed = true
}
mx.Unlock()
}
}(j)
}
wg.Wait()
2020-09-17 14:35:40 +00:00
if batchErr != nil {
return nil, batchErr
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
2020-09-17 14:35:40 +00:00
log.Infof("fetching messages for %d tipsets at %d done; took %s", batchSize, startOffset, build.Clock.Since(start))
2020-08-28 19:52:40 +00:00
return batch, nil
2019-07-31 07:13:49 +00:00
}
2019-07-05 14:29:17 +00:00
func persistMessages(ctx context.Context, bs bstore.Blockstore, bst *exchange.CompactedMessages) error {
_, span := trace.StartSpan(ctx, "persistMessages")
defer span.End()
2020-07-27 15:31:36 +00:00
for _, m := range bst.Bls {
2019-09-06 20:03:28 +00:00
//log.Infof("putting BLS message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Errorf("failed to persist messages: %+v", err)
2019-09-06 20:03:28 +00:00
return xerrors.Errorf("BLS message processing failed: %w", err)
}
2019-09-06 20:03:28 +00:00
}
2020-07-27 15:31:36 +00:00
for _, m := range bst.Secpk {
2020-02-12 23:52:36 +00:00
if m.Signature.Type != crypto.SigTypeSecp256k1 {
return xerrors.Errorf("unknown signature type on message %s: %q", m.Cid(), m.Signature.Type)
2019-09-06 20:03:28 +00:00
}
//log.Infof("putting secp256k1 message: %s", m.Cid())
if _, err := store.PutMessage(bs, m); err != nil {
log.Errorf("failed to persist messages: %+v", err)
2019-09-06 20:03:28 +00:00
return xerrors.Errorf("secp256k1 message processing failed: %w", err)
}
}
return nil
}
2020-06-23 21:51:25 +00:00
// collectChain tries to advance our view of the chain to the purported head.
//
// It goes through various stages:
//
// 1. StageHeaders: we proceed in the sync process by requesting block headers
// from our peers, moving back from their heads, until we reach a tipset
// that we have in common (such a common tipset must exist, thought it may
// simply be the genesis block).
//
// If the common tipset is our head, we treat the sync as a "fast-forward",
// else we must drop part of our chain to connect to the peer's head
// (referred to as "forking").
//
// 2. StagePersistHeaders: now that we've collected the missing headers,
// augmented by those on the other side of a fork, we persist them to the
// BlockStore.
//
// 3. StageMessages: having acquired the headers and found a common tipset,
// we then move forward, requesting the full blocks, including the messages.
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet, hts *types.TipSet) error {
ctx, span := trace.StartSpan(ctx, "collectChain")
defer span.End()
2019-11-15 21:35:29 +00:00
ss := extractSyncState(ctx)
ss.Init(hts, ts)
headers, err := syncer.collectHeaders(ctx, ts, hts)
2019-07-31 07:13:49 +00:00
if err != nil {
2019-12-04 03:56:29 +00:00
ss.Error(err)
2019-07-31 07:13:49 +00:00
return err
}
span.AddAttributes(trace.Int64Attribute("syncChainLength", int64(len(headers))))
2019-10-10 03:04:10 +00:00
if !headers[0].Equals(ts) {
log.Errorf("collectChain headers[0] should be equal to sync target. Its not: %s != %s", headers[0].Cids(), ts.Cids())
}
2019-11-15 21:35:29 +00:00
ss.SetStage(api.StagePersistHeaders)
2020-05-12 17:58:12 +00:00
toPersist := make([]*types.BlockHeader, 0, len(headers)*int(build.BlocksPerEpoch))
2019-07-31 07:13:49 +00:00
for _, ts := range headers {
2019-11-12 10:18:46 +00:00
toPersist = append(toPersist, ts.Blocks()...)
}
if err := syncer.store.PersistBlockHeaders(toPersist...); err != nil {
2019-12-04 03:56:29 +00:00
err = xerrors.Errorf("failed to persist synced blocks to the chainstore: %w", err)
ss.Error(err)
return err
2019-07-05 14:29:17 +00:00
}
2019-11-12 10:18:46 +00:00
toPersist = nil
2019-07-05 14:29:17 +00:00
2019-11-15 21:35:29 +00:00
ss.SetStage(api.StageMessages)
2019-10-10 11:13:26 +00:00
if err := syncer.syncMessagesAndCheckState(ctx, headers); err != nil {
2019-12-04 03:56:29 +00:00
err = xerrors.Errorf("collectChain syncMessages: %w", err)
ss.Error(err)
return err
2019-07-31 07:13:49 +00:00
}
2019-11-15 21:35:29 +00:00
ss.SetStage(api.StageSyncComplete)
2019-11-07 00:18:06 +00:00
log.Debugw("new tipset", "height", ts.Height(), "tipset", types.LogCids(ts.Cids()))
2019-07-31 07:13:49 +00:00
return nil
2019-07-05 14:36:08 +00:00
}
func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error {
2020-06-23 21:51:25 +00:00
return gen.VerifyVRF(ctx, worker, rand, evrf)
}
func (syncer *Syncer) State() []SyncerStateSnapshot {
2020-09-14 20:58:59 +00:00
return syncer.syncmgr.State()
}
2020-06-23 21:51:25 +00:00
// MarkBad manually adds a block to the "bad blocks" cache.
func (syncer *Syncer) MarkBad(blk cid.Cid) {
syncer.bad.Add(blk, NewBadBlockReason([]cid.Cid{blk}, "manually marked bad"))
}
2020-09-09 07:24:09 +00:00
// UnmarkBad manually adds a block to the "bad blocks" cache.
func (syncer *Syncer) UnmarkBad(blk cid.Cid) {
syncer.bad.Remove(blk)
}
2020-10-10 08:26:42 +00:00
func (syncer *Syncer) UnmarkAllBad() {
syncer.bad.Purge()
}
func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
bbr, ok := syncer.bad.Has(blk)
return bbr.String(), ok
}
2020-06-23 21:51:25 +00:00
func (syncer *Syncer) getLatestBeaconEntry(_ context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
cur := ts
for i := 0; i < 20; i++ {
cbe := cur.Blocks()[0].BeaconEntries
if len(cbe) > 0 {
return &cbe[len(cbe)-1], nil
}
if cur.Height() == 0 {
return nil, xerrors.Errorf("made it back to genesis block without finding beacon entry")
}
next, err := syncer.store.LoadTipSet(cur.Parents())
if err != nil {
return nil, xerrors.Errorf("failed to load parents when searching back for latest beacon entry: %w", err)
}
cur = next
}
return nil, xerrors.Errorf("found NO beacon entries in the 20 latest tipsets")
}
func (syncer *Syncer) IsEpochBeyondCurrMax(epoch abi.ChainEpoch) bool {
if syncer.Genesis == nil {
return false
}
2020-07-10 14:43:14 +00:00
now := uint64(build.Clock.Now().Unix())
return epoch > (abi.ChainEpoch((now-syncer.Genesis.MinTimestamp())/build.BlockDelaySecs) + MaxHeightDrift)
}