some initial godocs. (#2118)
This commit is contained in:
parent
e864244226
commit
c8104a03e6
@ -17,6 +17,10 @@ type Response struct {
|
|||||||
Err error
|
Err error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// RandomBeacon represents a system that provides randomness to Lotus.
|
||||||
|
// Other components interrogate the RandomBeacon to acquire randomness that's
|
||||||
|
// valid for a specific chain epoch. Also to verify beacon entries that have
|
||||||
|
// been posted on chain.
|
||||||
type RandomBeacon interface {
|
type RandomBeacon interface {
|
||||||
Entry(context.Context, uint64) <-chan Response
|
Entry(context.Context, uint64) <-chan Response
|
||||||
VerifyEntry(types.BeaconEntry, types.BeaconEntry) error
|
VerifyEntry(types.BeaconEntry, types.BeaconEntry) error
|
||||||
|
@ -41,6 +41,13 @@ func (dp *drandPeer) IsTLS() bool {
|
|||||||
return dp.tls
|
return dp.tls
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// DrandBeacon connects Lotus with a drand network in order to provide
|
||||||
|
// randomness to the system in a way that's aligned with Filecoin rounds/epochs.
|
||||||
|
//
|
||||||
|
// We connect to drand peers via their public HTTP endpoints. The peers are
|
||||||
|
// enumerated in the drandServers variable.
|
||||||
|
//
|
||||||
|
// The root trust for the Drand chain is configured from build.DrandChain.
|
||||||
type DrandBeacon struct {
|
type DrandBeacon struct {
|
||||||
client dclient.Client
|
client dclient.Client
|
||||||
|
|
||||||
|
@ -10,6 +10,7 @@ import (
|
|||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
cborutil "github.com/filecoin-project/go-cbor-util"
|
cborutil "github.com/filecoin-project/go-cbor-util"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/chain/store"
|
"github.com/filecoin-project/lotus/chain/store"
|
||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
|
||||||
@ -27,6 +28,24 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1"
|
|||||||
|
|
||||||
const BlockSyncMaxRequestLength = 800
|
const BlockSyncMaxRequestLength = 800
|
||||||
|
|
||||||
|
// BlockSyncService is the component that services BlockSync requests from
|
||||||
|
// peers.
|
||||||
|
//
|
||||||
|
// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync
|
||||||
|
// is an RPC-oriented protocol, with a single operation to request blocks.
|
||||||
|
//
|
||||||
|
// A request contains a start anchor block (referred to with a CID), and a
|
||||||
|
// amount of blocks requested beyond the anchor (including the anchor itself).
|
||||||
|
//
|
||||||
|
// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports
|
||||||
|
// two options at the moment:
|
||||||
|
//
|
||||||
|
// - include block contents
|
||||||
|
// - include block messages
|
||||||
|
//
|
||||||
|
// The response will include a status code, an optional message, and the
|
||||||
|
// response payload in case of success. The payload is a slice of serialized
|
||||||
|
// tipsets.
|
||||||
type BlockSyncService struct {
|
type BlockSyncService struct {
|
||||||
cs *store.ChainStore
|
cs *store.ChainStore
|
||||||
}
|
}
|
||||||
|
@ -64,6 +64,11 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetBlocks fetches count blocks from the network, from the provided tipset
|
||||||
|
// *backwards*, returning as many tipsets as count.
|
||||||
|
//
|
||||||
|
// {hint/usage}: This is used by the Syncer during normal chain syncing and when
|
||||||
|
// resolving forks.
|
||||||
func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
|
func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
|
ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -80,7 +85,9 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i
|
|||||||
Options: BSOptBlocks,
|
Options: BSOptBlocks,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// this peerset is sorted by latency and failure counting.
|
||||||
peers := bs.getPeers()
|
peers := bs.getPeers()
|
||||||
|
|
||||||
// randomize the first few peers so we don't always pick the same peer
|
// randomize the first few peers so we don't always pick the same peer
|
||||||
shufflePrefix(peers)
|
shufflePrefix(peers)
|
||||||
|
|
||||||
@ -356,6 +363,7 @@ func (bs *BlockSync) RemovePeer(p peer.ID) {
|
|||||||
bs.syncPeers.removePeer(p)
|
bs.syncPeers.removePeer(p)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// getPeers returns a preference-sorted set of peers to query.
|
||||||
func (bs *BlockSync) getPeers() []peer.ID {
|
func (bs *BlockSync) getPeers() []peer.ID {
|
||||||
return bs.syncPeers.prefSortedPeers()
|
return bs.syncPeers.prefSortedPeers()
|
||||||
}
|
}
|
||||||
|
@ -32,8 +32,11 @@ func (fts *FullTipSet) Cids() []cid.Cid {
|
|||||||
return cids
|
return cids
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TipSet returns a narrower view of this FullTipSet elliding the block
|
||||||
|
// messages.
|
||||||
func (fts *FullTipSet) TipSet() *types.TipSet {
|
func (fts *FullTipSet) TipSet() *types.TipSet {
|
||||||
if fts.tipset != nil {
|
if fts.tipset != nil {
|
||||||
|
// FIXME: fts.tipset is actually never set. Should it memoize?
|
||||||
return fts.tipset
|
return fts.tipset
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -34,7 +34,7 @@ type lbEntry struct {
|
|||||||
target types.TipSetKey
|
target types.TipSetKey
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
|
func (ci *ChainIndex) GetTipsetByHeight(_ context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) {
|
||||||
if from.Height()-to <= ci.skipLength {
|
if from.Height()-to <= ci.skipLength {
|
||||||
return ci.walkBack(from, to)
|
return ci.walkBack(from, to)
|
||||||
}
|
}
|
||||||
|
@ -52,6 +52,15 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation")
|
|||||||
// ReorgNotifee represents a callback that gets called upon reorgs.
|
// ReorgNotifee represents a callback that gets called upon reorgs.
|
||||||
type ReorgNotifee func(rev, app []*types.TipSet) error
|
type ReorgNotifee func(rev, app []*types.TipSet) error
|
||||||
|
|
||||||
|
// ChainStore is the main point of access to chain data.
|
||||||
|
//
|
||||||
|
// Raw chain data is stored in the Blockstore, with relevant markers (genesis,
|
||||||
|
// latest head tipset references) being tracked in the Datastore (key-value
|
||||||
|
// store).
|
||||||
|
//
|
||||||
|
// To alleviate disk access, the ChainStore has two ARC caches:
|
||||||
|
// 1. a tipset cache
|
||||||
|
// 2. a block => messages references cache.
|
||||||
type ChainStore struct {
|
type ChainStore struct {
|
||||||
bs bstore.Blockstore
|
bs bstore.Blockstore
|
||||||
ds dstore.Datastore
|
ds dstore.Datastore
|
||||||
@ -266,6 +275,9 @@ func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our
|
||||||
|
// internal state as our new head, if and only if it is heavier than the current
|
||||||
|
// head.
|
||||||
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
|
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
|
||||||
cs.heaviestLk.Lock()
|
cs.heaviestLk.Lock()
|
||||||
defer cs.heaviestLk.Unlock()
|
defer cs.heaviestLk.Unlock()
|
||||||
@ -331,6 +343,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// takeHeaviestTipSet actually sets the incoming tipset as our head both in
|
||||||
|
// memory and in the ChainStore. It also sends a notification to deliver to
|
||||||
|
// ReorgNotifees.
|
||||||
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
|
func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error {
|
||||||
_, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
|
_, span := trace.StartSpan(ctx, "takeHeaviestTipSet")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -368,6 +383,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error {
|
|||||||
return cs.takeHeaviestTipSet(context.TODO(), ts)
|
return cs.takeHeaviestTipSet(context.TODO(), ts)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Contains returns whether our BlockStore has all blocks in the supplied TipSet.
|
||||||
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
||||||
for _, c := range ts.Cids() {
|
for _, c := range ts.Cids() {
|
||||||
has, err := cs.bs.Has(c)
|
has, err := cs.bs.Has(c)
|
||||||
@ -382,6 +398,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) {
|
|||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetBlock fetches a BlockHeader with the supplied CID. It returns
|
||||||
|
// blockstore.ErrNotFound if the block was not found in the BlockStore.
|
||||||
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
|
func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) {
|
||||||
sb, err := cs.bs.Get(c)
|
sb, err := cs.bs.Get(c)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -474,6 +492,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti
|
|||||||
return leftChain, rightChain, nil
|
return leftChain, rightChain, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head).
|
||||||
func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
|
func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet {
|
||||||
cs.heaviestLk.Lock()
|
cs.heaviestLk.Lock()
|
||||||
defer cs.heaviestLk.Unlock()
|
defer cs.heaviestLk.Unlock()
|
||||||
|
127
chain/sync.go
127
chain/sync.go
@ -53,6 +53,29 @@ var log = logging.Logger("chain")
|
|||||||
|
|
||||||
var LocalIncoming = "incoming"
|
var LocalIncoming = "incoming"
|
||||||
|
|
||||||
|
// Syncer is in charge of running the chain synchronization logic. As such, it
|
||||||
|
// is tasked with these functions, amongst others:
|
||||||
|
//
|
||||||
|
// * Fast-forwards the chain as it learns of new TipSets from the network via
|
||||||
|
// the SyncManager.
|
||||||
|
// * Applies the fork choice rule to select the correct side when confronted
|
||||||
|
// with a fork in the network.
|
||||||
|
// * Requests block headers and messages from other peers when not available
|
||||||
|
// in our BlockStore.
|
||||||
|
// * Tracks blocks marked as bad in a cache.
|
||||||
|
// * Keeps the BlockStore and ChainStore consistent with our view of the world,
|
||||||
|
// the latter of which in turn informs other components when a reorg has been
|
||||||
|
// committed.
|
||||||
|
//
|
||||||
|
// The Syncer does not run workers itself. It's mainly concerned with
|
||||||
|
// ensuring a consistent state of chain consensus. The reactive and network-
|
||||||
|
// interfacing processes are part of other components, such as the SyncManager
|
||||||
|
// (which owns the sync scheduler and sync workers), BlockSync, the HELLO
|
||||||
|
// protocol, and the gossipsub block propagation layer.
|
||||||
|
//
|
||||||
|
// {hint/concept} The fork-choice rule as it currently stands is: "pick the
|
||||||
|
// chain with the heaviest weight, so long as it hasn’t deviated one finality
|
||||||
|
// threshold from our head (900 epochs, parameter determined by spec-actors)".
|
||||||
type Syncer struct {
|
type Syncer struct {
|
||||||
// The interface for accessing and putting tipsets into local storage
|
// The interface for accessing and putting tipsets into local storage
|
||||||
store *store.ChainStore
|
store *store.ChainStore
|
||||||
@ -85,6 +108,7 @@ type Syncer struct {
|
|||||||
verifier ffiwrapper.Verifier
|
verifier ffiwrapper.Verifier
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// NewSyncer creates a new Syncer object.
|
||||||
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
|
func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
|
||||||
gen, err := sm.ChainStore().GetGenesis()
|
gen, err := sm.ChainStore().GetGenesis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -182,6 +206,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
|
|||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to
|
||||||
|
// receive new block headers as they arrive from the network, and sends them to
|
||||||
|
// the returned channel.
|
||||||
|
//
|
||||||
|
// These blocks have not necessarily been incorporated to our view of the chain.
|
||||||
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
|
func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) {
|
||||||
sub := syncer.incoming.Sub(LocalIncoming)
|
sub := syncer.incoming.Sub(LocalIncoming)
|
||||||
out := make(chan *types.BlockHeader, 10)
|
out := make(chan *types.BlockHeader, 10)
|
||||||
@ -209,11 +238,15 @@ func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHe
|
|||||||
return out, nil
|
return out, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// ValidateMsgMeta performs structural and content hash validation of the
|
||||||
|
// messages within this block. If validation passes, it stores the messages in
|
||||||
|
// the underlying IPLD block store.
|
||||||
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
||||||
if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit {
|
if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit {
|
||||||
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
|
return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Collect the CIDs of both types of messages separately: BLS and Secpk.
|
||||||
var bcids, scids []cbg.CBORMarshaler
|
var bcids, scids []cbg.CBORMarshaler
|
||||||
for _, m := range fblk.BlsMessages {
|
for _, m := range fblk.BlsMessages {
|
||||||
c := cbg.CborCid(m.Cid())
|
c := cbg.CborCid(m.Cid())
|
||||||
@ -231,11 +264,14 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error {
|
|||||||
blockstore := syncer.store.Blockstore()
|
blockstore := syncer.store.Blockstore()
|
||||||
|
|
||||||
bs := cbor.NewCborStore(blockstore)
|
bs := cbor.NewCborStore(blockstore)
|
||||||
|
|
||||||
|
// Compute the root CID of the combined message trie.
|
||||||
smroot, err := computeMsgMeta(bs, bcids, scids)
|
smroot, err := computeMsgMeta(bs, bcids, scids)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
|
return xerrors.Errorf("validating msgmeta, compute failed: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check that the message trie root matches with what's in the block.
|
||||||
if fblk.Header.Messages != smroot {
|
if fblk.Header.Messages != smroot {
|
||||||
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
|
return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot)
|
||||||
}
|
}
|
||||||
@ -345,6 +381,8 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types
|
|||||||
return fts, nil
|
return fts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// computeMsgMeta computes the root CID of the combined arrays of message CIDs
|
||||||
|
// of both types (BLS and Secpk).
|
||||||
func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) {
|
func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) {
|
||||||
ctx := context.TODO()
|
ctx := context.TODO()
|
||||||
bmroot, err := amt.FromArray(ctx, bs, bmsgCids)
|
bmroot, err := amt.FromArray(ctx, bs, bmsgCids)
|
||||||
@ -368,14 +406,24 @@ func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (
|
|||||||
return mrcid, nil
|
return mrcid, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// FetchTipSet tries to load the provided tipset from the store, and falls back
|
||||||
|
// to the network (BlockSync) by querying the supplied peer if not found
|
||||||
|
// locally.
|
||||||
|
//
|
||||||
|
// {hint/usage} This is used from the HELLO protocol, to fetch the greeting
|
||||||
|
// peer's heaviest tipset if we don't have it.
|
||||||
func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
|
func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) {
|
||||||
if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil {
|
if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil {
|
||||||
return fts, nil
|
return fts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// fall back to the network.
|
||||||
return syncer.Bsync.GetFullTipSet(ctx, p, tsk)
|
return syncer.Bsync.GetFullTipSet(ctx, p, tsk)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full
|
||||||
|
// representation of it containing FullBlocks. If ALL blocks are not found
|
||||||
|
// locally, it errors entirely with blockstore.ErrNotFound.
|
||||||
func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) {
|
func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) {
|
||||||
ts, err := syncer.store.LoadTipSet(tsk)
|
ts, err := syncer.store.LoadTipSet(tsk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -400,6 +448,12 @@ func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet,
|
|||||||
return fts, nil
|
return fts, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Sync tries to advance our view of the chain to `maybeHead`. It does nothing
|
||||||
|
// if our current head is heavier than the requested tipset, or if we're already
|
||||||
|
// at the requested head, or if the head is the genesis.
|
||||||
|
//
|
||||||
|
// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the
|
||||||
|
// godocs on that method for a more detailed view.
|
||||||
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
ctx, span := trace.StartSpan(ctx, "chain.Sync")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -1004,6 +1058,39 @@ func extractSyncState(ctx context.Context) *SyncerState {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// collectHeaders collects the headers from the blocks between any two tipsets.
|
||||||
|
//
|
||||||
|
// `from` is the heaviest/projected/target tipset we have learned about, and
|
||||||
|
// `to` is usually an anchor tipset we already have in our view of the chain
|
||||||
|
// (which could be the genesis).
|
||||||
|
//
|
||||||
|
// collectHeaders checks if portions of the chain are in our ChainStore; falling
|
||||||
|
// down to the network to retrieve the missing parts. If during the process, any
|
||||||
|
// portion we receive is in our denylist (bad list), we short-circuit.
|
||||||
|
//
|
||||||
|
// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest,
|
||||||
|
// and `to` is the lowest. This method traverses the chain backwards.
|
||||||
|
//
|
||||||
|
// {hint/usage}: This is used by collectChain, which is in turn called from the
|
||||||
|
// main Sync method (Syncer#Sync), so it's a pretty central method.
|
||||||
|
//
|
||||||
|
// {hint/logic}: The logic of this method is as follows:
|
||||||
|
//
|
||||||
|
// 1. Check that the from tipset is not linked to a parent block known to be
|
||||||
|
// bad.
|
||||||
|
// 2. Check the consistency of beacon entries in the from tipset. We check
|
||||||
|
// total equality of the BeaconEntries in each block.
|
||||||
|
// 3. Travers the chain backwards, for each tipset:
|
||||||
|
// 3a. Load it from the chainstore; if found, it move on to its parent.
|
||||||
|
// 3b. Query our peers via BlockSync in batches, requesting up to a
|
||||||
|
// maximum of 500 tipsets every time.
|
||||||
|
//
|
||||||
|
// Once we've concluded, if we find a mismatching tipset at the height where the
|
||||||
|
// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork
|
||||||
|
// to resolve it. Refer to the godocs there.
|
||||||
|
//
|
||||||
|
// All throughout the process, we keep checking if the received blocks are in
|
||||||
|
// the deny list, and short-circuit the process if so.
|
||||||
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||||
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
ctx, span := trace.StartSpan(ctx, "collectHeaders")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -1020,6 +1107,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Check if the parents of the from block are in the denylist.
|
||||||
|
// i.e. if a fork of the chain has been requested that we know to be bad.
|
||||||
for _, pcid := range from.Parents().Cids() {
|
for _, pcid := range from.Parents().Cids() {
|
||||||
if reason, ok := syncer.bad.Has(pcid); ok {
|
if reason, ok := syncer.bad.Has(pcid); ok {
|
||||||
markBad("linked to %s", pcid)
|
markBad("linked to %s", pcid)
|
||||||
@ -1090,8 +1179,8 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
// NB: GetBlocks validates that the blocks are in-fact the ones we
|
||||||
// requested, and that they are correctly linked to eachother. It does
|
// requested, and that they are correctly linked to one another. It does
|
||||||
// not validate any state transitions
|
// not validate any state transitions.
|
||||||
window := 500
|
window := 500
|
||||||
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
|
if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window {
|
||||||
window = gap
|
window = gap
|
||||||
@ -1132,7 +1221,6 @@ loop:
|
|||||||
at = blks[len(blks)-1].Parents()
|
at = blks[len(blks)-1].Parents()
|
||||||
}
|
}
|
||||||
|
|
||||||
// We have now ascertained that this is *not* a 'fast forward'
|
|
||||||
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) {
|
if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) {
|
||||||
last := blockSet[len(blockSet)-1]
|
last := blockSet[len(blockSet)-1]
|
||||||
if last.Parents() == to.Parents() {
|
if last.Parents() == to.Parents() {
|
||||||
@ -1140,6 +1228,8 @@ loop:
|
|||||||
return blockSet, nil
|
return blockSet, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// We have now ascertained that this is *not* a 'fast forward'
|
||||||
|
|
||||||
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height())
|
||||||
fork, err := syncer.syncFork(ctx, last, to)
|
fork, err := syncer.syncFork(ctx, last, to)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1161,6 +1251,12 @@ loop:
|
|||||||
|
|
||||||
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
||||||
|
|
||||||
|
// syncFork tries to obtain the chain fragment that links a fork into a common
|
||||||
|
// ancestor in our view of the chain.
|
||||||
|
//
|
||||||
|
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
|
||||||
|
// denylist. Else, we find the common ancestor, and add the missing chain
|
||||||
|
// fragment until the fork point to the returned []TipSet.
|
||||||
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) {
|
||||||
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold))
|
tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -1312,6 +1408,25 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// collectChain tries to advance our view of the chain to the purported head.
|
||||||
|
//
|
||||||
|
// It goes through various stages:
|
||||||
|
//
|
||||||
|
// 1. StageHeaders: we proceed in the sync process by requesting block headers
|
||||||
|
// from our peers, moving back from their heads, until we reach a tipset
|
||||||
|
// that we have in common (such a common tipset must exist, thought it may
|
||||||
|
// simply be the genesis block).
|
||||||
|
//
|
||||||
|
// If the common tipset is our head, we treat the sync as a "fast-forward",
|
||||||
|
// else we must drop part of our chain to connect to the peer's head
|
||||||
|
// (referred to as "forking").
|
||||||
|
//
|
||||||
|
// 2. StagePersistHeaders: now that we've collected the missing headers,
|
||||||
|
// augmented by those on the other side of a fork, we persist them to the
|
||||||
|
// BlockStore.
|
||||||
|
//
|
||||||
|
// 3. StageMessages: having acquired the headers and found a common tipset,
|
||||||
|
// we then move forward, requesting the full blocks, including the messages.
|
||||||
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
|
func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error {
|
||||||
ctx, span := trace.StartSpan(ctx, "collectChain")
|
ctx, span := trace.StartSpan(ctx, "collectChain")
|
||||||
defer span.End()
|
defer span.End()
|
||||||
@ -1361,9 +1476,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error
|
|||||||
func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error {
|
func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error {
|
||||||
if build.InsecurePoStValidation {
|
if build.InsecurePoStValidation {
|
||||||
return nil
|
return nil
|
||||||
} else {
|
|
||||||
return gen.VerifyVRF(ctx, worker, rand, evrf)
|
|
||||||
}
|
}
|
||||||
|
return gen.VerifyVRF(ctx, worker, rand, evrf)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) State() []SyncerState {
|
func (syncer *Syncer) State() []SyncerState {
|
||||||
@ -1374,6 +1488,7 @@ func (syncer *Syncer) State() []SyncerState {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// MarkBad manually adds a block to the "bad blocks" cache.
|
||||||
func (syncer *Syncer) MarkBad(blk cid.Cid) {
|
func (syncer *Syncer) MarkBad(blk cid.Cid) {
|
||||||
syncer.bad.Add(blk, "manually marked bad")
|
syncer.bad.Add(blk, "manually marked bad")
|
||||||
}
|
}
|
||||||
@ -1381,7 +1496,7 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) {
|
|||||||
func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
|
func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) {
|
||||||
return syncer.bad.Has(blk)
|
return syncer.bad.Has(blk)
|
||||||
}
|
}
|
||||||
func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
|
func (syncer *Syncer) getLatestBeaconEntry(_ context.Context, ts *types.TipSet) (*types.BeaconEntry, error) {
|
||||||
cur := ts
|
cur := ts
|
||||||
for i := 0; i < 20; i++ {
|
for i := 0; i < 20; i++ {
|
||||||
cbe := cur.Blocks()[0].BeaconEntries
|
cbe := cur.Blocks()[0].BeaconEntries
|
||||||
|
@ -252,6 +252,12 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti
|
|||||||
return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil
|
return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// mineOne mines a single block, and does so synchronously, if and only if we
|
||||||
|
// have won the current round.
|
||||||
|
//
|
||||||
|
// {hint/landmark}: This method coordinates all the steps involved in mining a
|
||||||
|
// block, including the condition of whether mine or not at all depending on
|
||||||
|
// whether we win the round or not.
|
||||||
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
|
func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) {
|
||||||
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
|
log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids()))
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
|
Loading…
Reference in New Issue
Block a user