From c8104a03e61f56f17c4a39d5efbf0ad23069dc07 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Ra=C3=BAl=20Kripalani?= Date: Tue, 23 Jun 2020 22:51:25 +0100 Subject: [PATCH] some initial godocs. (#2118) --- chain/beacon/beacon.go | 4 + chain/beacon/drand/drand.go | 7 ++ chain/blocksync/blocksync.go | 19 +++++ chain/blocksync/blocksync_client.go | 8 ++ chain/store/fts.go | 3 + chain/store/index.go | 2 +- chain/store/store.go | 19 +++++ chain/sync.go | 127 ++++++++++++++++++++++++++-- miner/miner.go | 6 ++ 9 files changed, 188 insertions(+), 7 deletions(-) diff --git a/chain/beacon/beacon.go b/chain/beacon/beacon.go index 34405f3c8..2be2e7f1c 100644 --- a/chain/beacon/beacon.go +++ b/chain/beacon/beacon.go @@ -17,6 +17,10 @@ type Response struct { Err error } +// RandomBeacon represents a system that provides randomness to Lotus. +// Other components interrogate the RandomBeacon to acquire randomness that's +// valid for a specific chain epoch. Also to verify beacon entries that have +// been posted on chain. type RandomBeacon interface { Entry(context.Context, uint64) <-chan Response VerifyEntry(types.BeaconEntry, types.BeaconEntry) error diff --git a/chain/beacon/drand/drand.go b/chain/beacon/drand/drand.go index eb51a2af3..00ff05f81 100644 --- a/chain/beacon/drand/drand.go +++ b/chain/beacon/drand/drand.go @@ -41,6 +41,13 @@ func (dp *drandPeer) IsTLS() bool { return dp.tls } +// DrandBeacon connects Lotus with a drand network in order to provide +// randomness to the system in a way that's aligned with Filecoin rounds/epochs. +// +// We connect to drand peers via their public HTTP endpoints. The peers are +// enumerated in the drandServers variable. +// +// The root trust for the Drand chain is configured from build.DrandChain. type DrandBeacon struct { client dclient.Client diff --git a/chain/blocksync/blocksync.go b/chain/blocksync/blocksync.go index daca9ce20..a9251c419 100644 --- a/chain/blocksync/blocksync.go +++ b/chain/blocksync/blocksync.go @@ -10,6 +10,7 @@ import ( "golang.org/x/xerrors" cborutil "github.com/filecoin-project/go-cbor-util" + "github.com/filecoin-project/lotus/chain/store" "github.com/filecoin-project/lotus/chain/types" @@ -27,6 +28,24 @@ const BlockSyncProtocolID = "/fil/sync/blk/0.0.1" const BlockSyncMaxRequestLength = 800 +// BlockSyncService is the component that services BlockSync requests from +// peers. +// +// BlockSync is the basic chain synchronization protocol of Filecoin. BlockSync +// is an RPC-oriented protocol, with a single operation to request blocks. +// +// A request contains a start anchor block (referred to with a CID), and a +// amount of blocks requested beyond the anchor (including the anchor itself). +// +// A client can also pass options, encoded as a 64-bit bitfield. Lotus supports +// two options at the moment: +// +// - include block contents +// - include block messages +// +// The response will include a status code, an optional message, and the +// response payload in case of success. The payload is a slice of serialized +// tipsets. type BlockSyncService struct { cs *store.ChainStore } diff --git a/chain/blocksync/blocksync_client.go b/chain/blocksync/blocksync_client.go index 129e8d332..daa4b6335 100644 --- a/chain/blocksync/blocksync_client.go +++ b/chain/blocksync/blocksync_client.go @@ -64,6 +64,11 @@ func (bs *BlockSync) processStatus(req *BlockSyncRequest, res *BlockSyncResponse } } +// GetBlocks fetches count blocks from the network, from the provided tipset +// *backwards*, returning as many tipsets as count. +// +// {hint/usage}: This is used by the Syncer during normal chain syncing and when +// resolving forks. func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count int) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "bsync.GetBlocks") defer span.End() @@ -80,7 +85,9 @@ func (bs *BlockSync) GetBlocks(ctx context.Context, tsk types.TipSetKey, count i Options: BSOptBlocks, } + // this peerset is sorted by latency and failure counting. peers := bs.getPeers() + // randomize the first few peers so we don't always pick the same peer shufflePrefix(peers) @@ -356,6 +363,7 @@ func (bs *BlockSync) RemovePeer(p peer.ID) { bs.syncPeers.removePeer(p) } +// getPeers returns a preference-sorted set of peers to query. func (bs *BlockSync) getPeers() []peer.ID { return bs.syncPeers.prefSortedPeers() } diff --git a/chain/store/fts.go b/chain/store/fts.go index f9ec4459e..0324938d7 100644 --- a/chain/store/fts.go +++ b/chain/store/fts.go @@ -32,8 +32,11 @@ func (fts *FullTipSet) Cids() []cid.Cid { return cids } +// TipSet returns a narrower view of this FullTipSet elliding the block +// messages. func (fts *FullTipSet) TipSet() *types.TipSet { if fts.tipset != nil { + // FIXME: fts.tipset is actually never set. Should it memoize? return fts.tipset } diff --git a/chain/store/index.go b/chain/store/index.go index bb363ec18..7edbf251f 100644 --- a/chain/store/index.go +++ b/chain/store/index.go @@ -34,7 +34,7 @@ type lbEntry struct { target types.TipSetKey } -func (ci *ChainIndex) GetTipsetByHeight(ctx context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) { +func (ci *ChainIndex) GetTipsetByHeight(_ context.Context, from *types.TipSet, to abi.ChainEpoch) (*types.TipSet, error) { if from.Height()-to <= ci.skipLength { return ci.walkBack(from, to) } diff --git a/chain/store/store.go b/chain/store/store.go index 0edccb95c..4dabb96f7 100644 --- a/chain/store/store.go +++ b/chain/store/store.go @@ -52,6 +52,15 @@ var blockValidationCacheKeyPrefix = dstore.NewKey("blockValidation") // ReorgNotifee represents a callback that gets called upon reorgs. type ReorgNotifee func(rev, app []*types.TipSet) error +// ChainStore is the main point of access to chain data. +// +// Raw chain data is stored in the Blockstore, with relevant markers (genesis, +// latest head tipset references) being tracked in the Datastore (key-value +// store). +// +// To alleviate disk access, the ChainStore has two ARC caches: +// 1. a tipset cache +// 2. a block => messages references cache. type ChainStore struct { bs bstore.Blockstore ds dstore.Datastore @@ -266,6 +275,9 @@ func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error { return nil } +// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our +// internal state as our new head, if and only if it is heavier than the current +// head. func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() @@ -331,6 +343,9 @@ func (cs *ChainStore) reorgWorker(ctx context.Context, initialNotifees []ReorgNo return out } +// takeHeaviestTipSet actually sets the incoming tipset as our head both in +// memory and in the ChainStore. It also sends a notification to deliver to +// ReorgNotifees. func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet) error { _, span := trace.StartSpan(ctx, "takeHeaviestTipSet") defer span.End() @@ -368,6 +383,7 @@ func (cs *ChainStore) SetHead(ts *types.TipSet) error { return cs.takeHeaviestTipSet(context.TODO(), ts) } +// Contains returns whether our BlockStore has all blocks in the supplied TipSet. func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { for _, c := range ts.Cids() { has, err := cs.bs.Has(c) @@ -382,6 +398,8 @@ func (cs *ChainStore) Contains(ts *types.TipSet) (bool, error) { return true, nil } +// GetBlock fetches a BlockHeader with the supplied CID. It returns +// blockstore.ErrNotFound if the block was not found in the BlockStore. func (cs *ChainStore) GetBlock(c cid.Cid) (*types.BlockHeader, error) { sb, err := cs.bs.Get(c) if err != nil { @@ -474,6 +492,7 @@ func (cs *ChainStore) ReorgOps(a, b *types.TipSet) ([]*types.TipSet, []*types.Ti return leftChain, rightChain, nil } +// GetHeaviestTipSet returns the current heaviest tipset known (i.e. our head). func (cs *ChainStore) GetHeaviestTipSet() *types.TipSet { cs.heaviestLk.Lock() defer cs.heaviestLk.Unlock() diff --git a/chain/sync.go b/chain/sync.go index f20637c9e..0a08e8b15 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -53,6 +53,29 @@ var log = logging.Logger("chain") var LocalIncoming = "incoming" +// Syncer is in charge of running the chain synchronization logic. As such, it +// is tasked with these functions, amongst others: +// +// * Fast-forwards the chain as it learns of new TipSets from the network via +// the SyncManager. +// * Applies the fork choice rule to select the correct side when confronted +// with a fork in the network. +// * Requests block headers and messages from other peers when not available +// in our BlockStore. +// * Tracks blocks marked as bad in a cache. +// * Keeps the BlockStore and ChainStore consistent with our view of the world, +// the latter of which in turn informs other components when a reorg has been +// committed. +// +// The Syncer does not run workers itself. It's mainly concerned with +// ensuring a consistent state of chain consensus. The reactive and network- +// interfacing processes are part of other components, such as the SyncManager +// (which owns the sync scheduler and sync workers), BlockSync, the HELLO +// protocol, and the gossipsub block propagation layer. +// +// {hint/concept} The fork-choice rule as it currently stands is: "pick the +// chain with the heaviest weight, so long as it hasn’t deviated one finality +// threshold from our head (900 epochs, parameter determined by spec-actors)". type Syncer struct { // The interface for accessing and putting tipsets into local storage store *store.ChainStore @@ -85,6 +108,7 @@ type Syncer struct { verifier ffiwrapper.Verifier } +// NewSyncer creates a new Syncer object. func NewSyncer(sm *stmgr.StateManager, bsync *blocksync.BlockSync, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { @@ -182,6 +206,11 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool { return true } +// IncomingBlocks spawns a goroutine that subscribes to the local eventbus to +// receive new block headers as they arrive from the network, and sends them to +// the returned channel. +// +// These blocks have not necessarily been incorporated to our view of the chain. func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) { sub := syncer.incoming.Sub(LocalIncoming) out := make(chan *types.BlockHeader, 10) @@ -209,11 +238,15 @@ func (syncer *Syncer) IncomingBlocks(ctx context.Context) (<-chan *types.BlockHe return out, nil } +// ValidateMsgMeta performs structural and content hash validation of the +// messages within this block. If validation passes, it stores the messages in +// the underlying IPLD block store. func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { if msgc := len(fblk.BlsMessages) + len(fblk.SecpkMessages); msgc > build.BlockMessageLimit { return xerrors.Errorf("block %s has too many messages (%d)", fblk.Header.Cid(), msgc) } + // Collect the CIDs of both types of messages separately: BLS and Secpk. var bcids, scids []cbg.CBORMarshaler for _, m := range fblk.BlsMessages { c := cbg.CborCid(m.Cid()) @@ -231,11 +264,14 @@ func (syncer *Syncer) ValidateMsgMeta(fblk *types.FullBlock) error { blockstore := syncer.store.Blockstore() bs := cbor.NewCborStore(blockstore) + + // Compute the root CID of the combined message trie. smroot, err := computeMsgMeta(bs, bcids, scids) if err != nil { return xerrors.Errorf("validating msgmeta, compute failed: %w", err) } + // Check that the message trie root matches with what's in the block. if fblk.Header.Messages != smroot { return xerrors.Errorf("messages in full block did not match msgmeta root in header (%s != %s)", fblk.Header.Messages, smroot) } @@ -345,6 +381,8 @@ func zipTipSetAndMessages(bs cbor.IpldStore, ts *types.TipSet, allbmsgs []*types return fts, nil } +// computeMsgMeta computes the root CID of the combined arrays of message CIDs +// of both types (BLS and Secpk). func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) (cid.Cid, error) { ctx := context.TODO() bmroot, err := amt.FromArray(ctx, bs, bmsgCids) @@ -368,14 +406,24 @@ func computeMsgMeta(bs cbor.IpldStore, bmsgCids, smsgCids []cbg.CBORMarshaler) ( return mrcid, nil } +// FetchTipSet tries to load the provided tipset from the store, and falls back +// to the network (BlockSync) by querying the supplied peer if not found +// locally. +// +// {hint/usage} This is used from the HELLO protocol, to fetch the greeting +// peer's heaviest tipset if we don't have it. func (syncer *Syncer) FetchTipSet(ctx context.Context, p peer.ID, tsk types.TipSetKey) (*store.FullTipSet, error) { if fts, err := syncer.tryLoadFullTipSet(tsk); err == nil { return fts, nil } + // fall back to the network. return syncer.Bsync.GetFullTipSet(ctx, p, tsk) } +// tryLoadFullTipSet queries the tipset in the ChainStore, and returns a full +// representation of it containing FullBlocks. If ALL blocks are not found +// locally, it errors entirely with blockstore.ErrNotFound. func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, error) { ts, err := syncer.store.LoadTipSet(tsk) if err != nil { @@ -400,6 +448,12 @@ func (syncer *Syncer) tryLoadFullTipSet(tsk types.TipSetKey) (*store.FullTipSet, return fts, nil } +// Sync tries to advance our view of the chain to `maybeHead`. It does nothing +// if our current head is heavier than the requested tipset, or if we're already +// at the requested head, or if the head is the genesis. +// +// Most of the heavy-lifting logic happens in syncer#collectChain. Refer to the +// godocs on that method for a more detailed view. func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "chain.Sync") defer span.End() @@ -1004,6 +1058,39 @@ func extractSyncState(ctx context.Context) *SyncerState { return nil } +// collectHeaders collects the headers from the blocks between any two tipsets. +// +// `from` is the heaviest/projected/target tipset we have learned about, and +// `to` is usually an anchor tipset we already have in our view of the chain +// (which could be the genesis). +// +// collectHeaders checks if portions of the chain are in our ChainStore; falling +// down to the network to retrieve the missing parts. If during the process, any +// portion we receive is in our denylist (bad list), we short-circuit. +// +// {hint/naming}: `from` and `to` is in inverse order. `from` is the highest, +// and `to` is the lowest. This method traverses the chain backwards. +// +// {hint/usage}: This is used by collectChain, which is in turn called from the +// main Sync method (Syncer#Sync), so it's a pretty central method. +// +// {hint/logic}: The logic of this method is as follows: +// +// 1. Check that the from tipset is not linked to a parent block known to be +// bad. +// 2. Check the consistency of beacon entries in the from tipset. We check +// total equality of the BeaconEntries in each block. +// 3. Travers the chain backwards, for each tipset: +// 3a. Load it from the chainstore; if found, it move on to its parent. +// 3b. Query our peers via BlockSync in batches, requesting up to a +// maximum of 500 tipsets every time. +// +// Once we've concluded, if we find a mismatching tipset at the height where the +// anchor tipset should be, we are facing a fork, and we invoke Syncer#syncFork +// to resolve it. Refer to the godocs there. +// +// All throughout the process, we keep checking if the received blocks are in +// the deny list, and short-circuit the process if so. func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { ctx, span := trace.StartSpan(ctx, "collectHeaders") defer span.End() @@ -1020,6 +1107,8 @@ func (syncer *Syncer) collectHeaders(ctx context.Context, from *types.TipSet, to } } + // Check if the parents of the from block are in the denylist. + // i.e. if a fork of the chain has been requested that we know to be bad. for _, pcid := range from.Parents().Cids() { if reason, ok := syncer.bad.Has(pcid); ok { markBad("linked to %s", pcid) @@ -1090,8 +1179,8 @@ loop: } // NB: GetBlocks validates that the blocks are in-fact the ones we - // requested, and that they are correctly linked to eachother. It does - // not validate any state transitions + // requested, and that they are correctly linked to one another. It does + // not validate any state transitions. window := 500 if gap := int(blockSet[len(blockSet)-1].Height() - untilHeight); gap < window { window = gap @@ -1132,7 +1221,6 @@ loop: at = blks[len(blks)-1].Parents() } - // We have now ascertained that this is *not* a 'fast forward' if !types.CidArrsEqual(blockSet[len(blockSet)-1].Parents().Cids(), to.Cids()) { last := blockSet[len(blockSet)-1] if last.Parents() == to.Parents() { @@ -1140,6 +1228,8 @@ loop: return blockSet, nil } + // We have now ascertained that this is *not* a 'fast forward' + log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", from.Cids(), from.Height(), to.Cids(), to.Height()) fork, err := syncer.syncFork(ctx, last, to) if err != nil { @@ -1161,6 +1251,12 @@ loop: var ErrForkTooLong = fmt.Errorf("fork longer than threshold") +// syncFork tries to obtain the chain fragment that links a fork into a common +// ancestor in our view of the chain. +// +// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the +// denylist. Else, we find the common ancestor, and add the missing chain +// fragment until the fork point to the returned []TipSet. func (syncer *Syncer) syncFork(ctx context.Context, from *types.TipSet, to *types.TipSet) ([]*types.TipSet, error) { tips, err := syncer.Bsync.GetBlocks(ctx, from.Parents(), int(build.ForkLengthThreshold)) if err != nil { @@ -1312,6 +1408,25 @@ func persistMessages(bs bstore.Blockstore, bst *blocksync.BSTipSet) error { return nil } +// collectChain tries to advance our view of the chain to the purported head. +// +// It goes through various stages: +// +// 1. StageHeaders: we proceed in the sync process by requesting block headers +// from our peers, moving back from their heads, until we reach a tipset +// that we have in common (such a common tipset must exist, thought it may +// simply be the genesis block). +// +// If the common tipset is our head, we treat the sync as a "fast-forward", +// else we must drop part of our chain to connect to the peer's head +// (referred to as "forking"). +// +// 2. StagePersistHeaders: now that we've collected the missing headers, +// augmented by those on the other side of a fork, we persist them to the +// BlockStore. +// +// 3. StageMessages: having acquired the headers and found a common tipset, +// we then move forward, requesting the full blocks, including the messages. func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error { ctx, span := trace.StartSpan(ctx, "collectChain") defer span.End() @@ -1361,9 +1476,8 @@ func (syncer *Syncer) collectChain(ctx context.Context, ts *types.TipSet) error func VerifyElectionPoStVRF(ctx context.Context, worker address.Address, rand []byte, evrf []byte) error { if build.InsecurePoStValidation { return nil - } else { - return gen.VerifyVRF(ctx, worker, rand, evrf) } + return gen.VerifyVRF(ctx, worker, rand, evrf) } func (syncer *Syncer) State() []SyncerState { @@ -1374,6 +1488,7 @@ func (syncer *Syncer) State() []SyncerState { return out } +// MarkBad manually adds a block to the "bad blocks" cache. func (syncer *Syncer) MarkBad(blk cid.Cid) { syncer.bad.Add(blk, "manually marked bad") } @@ -1381,7 +1496,7 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) { func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) { return syncer.bad.Has(blk) } -func (syncer *Syncer) getLatestBeaconEntry(ctx context.Context, ts *types.TipSet) (*types.BeaconEntry, error) { +func (syncer *Syncer) getLatestBeaconEntry(_ context.Context, ts *types.TipSet) (*types.BeaconEntry, error) { cur := ts for i := 0; i < 20; i++ { cbe := cur.Blocks()[0].BeaconEntries diff --git a/miner/miner.go b/miner/miner.go index bdeed8ac5..fa97bd265 100644 --- a/miner/miner.go +++ b/miner/miner.go @@ -252,6 +252,12 @@ func (m *Miner) hasPower(ctx context.Context, addr address.Address, ts *types.Ti return mpower.MinerPower.QualityAdjPower.GreaterThanEqual(power.ConsensusMinerMinPower), nil } +// mineOne mines a single block, and does so synchronously, if and only if we +// have won the current round. +// +// {hint/landmark}: This method coordinates all the steps involved in mining a +// block, including the condition of whether mine or not at all depending on +// whether we win the round or not. func (m *Miner) mineOne(ctx context.Context, base *MiningBase) (*types.BlockMsg, error) { log.Debugw("attempting to mine a block", "tipset", types.LogCids(base.TipSet.Cids())) start := time.Now()