feat: chainstore: FRC-0051: Remove all equivocated blocks from tipsets

This commit is contained in:
Aayush 2023-07-27 10:34:44 -04:00
parent 93b0ea2108
commit 52657c5cb0
7 changed files with 275 additions and 102 deletions

View File

@ -449,18 +449,19 @@ func (cg *ChainGen) NextTipSetFromMiners(base *types.TipSet, miners []address.Ad
}
func (cg *ChainGen) NextTipSetFromMinersWithMessagesAndNulls(base *types.TipSet, miners []address.Address, msgs [][]*types.SignedMessage, nulls abi.ChainEpoch) (*store.FullTipSet, error) {
ctx := context.TODO()
var blks []*types.FullBlock
for round := base.Height() + nulls + 1; len(blks) == 0; round++ {
for mi, m := range miners {
bvals, et, ticket, err := cg.nextBlockProof(context.TODO(), base, m, round)
bvals, et, ticket, err := cg.nextBlockProof(ctx, base, m, round)
if err != nil {
return nil, xerrors.Errorf("next block proof: %w", err)
}
if et != nil {
// TODO: maybe think about passing in more real parameters to this?
wpost, err := cg.eppProvs[m].ComputeProof(context.TODO(), nil, nil, round, network.Version0)
wpost, err := cg.eppProvs[m].ComputeProof(ctx, nil, nil, round, network.Version0)
if err != nil {
return nil, err
}
@ -476,8 +477,18 @@ func (cg *ChainGen) NextTipSetFromMinersWithMessagesAndNulls(base *types.TipSet,
}
fts := store.NewFullTipSet(blks)
if err := cg.cs.PutTipSet(context.TODO(), fts.TipSet()); err != nil {
return nil, err
if err := cg.cs.PersistTipsets(ctx, []*types.TipSet{fts.TipSet()}); err != nil {
return nil, xerrors.Errorf("failed to persist tipset: %w", err)
}
for _, blk := range blks {
if err := cg.cs.AddToTipSetTracker(ctx, blk.Header); err != nil {
return nil, xerrors.Errorf("failed to add to tipset tracker: %w", err)
}
}
if err := cg.cs.RefreshHeaviestTipSet(ctx, fts.TipSet().Height()); err != nil {
return nil, xerrors.Errorf("failed to put tipset: %w", err)
}
cg.CurTipset = fts

View File

@ -70,7 +70,7 @@ func TestChainCheckpoint(t *testing.T) {
}
// See if the chain will take the fork, it shouldn't.
err = cs.MaybeTakeHeavierTipSet(context.Background(), last)
err = cs.RefreshHeaviestTipSet(context.Background(), last.Height())
require.NoError(t, err)
head = cs.GetHeaviestTipSet()
require.True(t, head.Equals(checkpoint))
@ -80,7 +80,7 @@ func TestChainCheckpoint(t *testing.T) {
require.NoError(t, err)
// Now switch to the other fork.
err = cs.MaybeTakeHeavierTipSet(context.Background(), last)
err = cs.RefreshHeaviestTipSet(context.Background(), last.Height())
require.NoError(t, err)
head = cs.GetHeaviestTipSet()
require.True(t, head.Equals(last))

View File

@ -16,6 +16,7 @@ import (
"github.com/filecoin-project/lotus/chain/consensus/filcns"
"github.com/filecoin-project/lotus/chain/gen"
"github.com/filecoin-project/lotus/chain/store"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/chain/types/mock"
)
@ -47,28 +48,29 @@ func TestIndexSeeks(t *testing.T) {
}
cur := mock.TipSet(gen)
if err := cs.PutTipSet(ctx, mock.TipSet(gen)); err != nil {
t.Fatal(err)
}
assert.NoError(t, cs.SetGenesis(ctx, gen))
// Put 113 blocks from genesis
for i := 0; i < 113; i++ {
nextts := mock.TipSet(mock.MkBlock(cur, 1, 1))
if err := cs.PutTipSet(ctx, nextts); err != nil {
t.Fatal(err)
}
nextBlk := mock.MkBlock(cur, 1, 1)
nextts := mock.TipSet(nextBlk)
assert.NoError(t, cs.PersistTipsets(ctx, []*types.TipSet{nextts}))
assert.NoError(t, cs.AddToTipSetTracker(ctx, nextBlk))
cur = nextts
}
assert.NoError(t, cs.RefreshHeaviestTipSet(ctx, cur.Height()))
// Put 50 null epochs + 1 block
skip := mock.MkBlock(cur, 1, 1)
skip.Height += 50
skipts := mock.TipSet(skip)
if err := cs.PutTipSet(ctx, skipts); err != nil {
assert.NoError(t, cs.PersistTipsets(ctx, []*types.TipSet{skipts}))
assert.NoError(t, cs.AddToTipSetTracker(ctx, skip))
if err := cs.RefreshHeaviestTipSet(ctx, skip.Height); err != nil {
t.Fatal(err)
}

View File

@ -367,49 +367,32 @@ func (cs *ChainStore) UnmarkBlockAsValidated(ctx context.Context, blkid cid.Cid)
func (cs *ChainStore) SetGenesis(ctx context.Context, b *types.BlockHeader) error {
ts, err := types.NewTipSet([]*types.BlockHeader{b})
if err != nil {
return err
return xerrors.Errorf("failed to construct genesis tipset: %w", err)
}
if err := cs.PutTipSet(ctx, ts); err != nil {
return err
if err := cs.PersistTipsets(ctx, []*types.TipSet{ts}); err != nil {
return xerrors.Errorf("failed to persist genesis tipset: %w", err)
}
if err := cs.AddToTipSetTracker(ctx, b); err != nil {
return xerrors.Errorf("failed to add genesis tipset to tracker: %w", err)
}
if err := cs.RefreshHeaviestTipSet(ctx, ts.Height()); err != nil {
return xerrors.Errorf("failed to put genesis tipset: %w", err)
}
return cs.metadataDs.Put(ctx, dstore.NewKey("0"), b.Cid().Bytes())
}
func (cs *ChainStore) PutTipSet(ctx context.Context, ts *types.TipSet) error {
if err := cs.PersistTipsets(ctx, []*types.TipSet{ts}); err != nil {
return xerrors.Errorf("failed to persist tipset: %w", err)
}
expanded, err := cs.expandTipset(ctx, ts.Blocks()[0])
if err != nil {
return xerrors.Errorf("errored while expanding tipset: %w", err)
}
if expanded.Key() != ts.Key() {
log.Debugf("expanded %s into %s\n", ts.Cids(), expanded.Cids())
tsBlk, err := expanded.Key().ToStorageBlock()
if err != nil {
return xerrors.Errorf("failed to get tipset key block: %w", err)
}
if err = cs.chainLocalBlockstore.Put(ctx, tsBlk); err != nil {
return xerrors.Errorf("failed to put tipset key block: %w", err)
}
}
if err := cs.MaybeTakeHeavierTipSet(ctx, expanded); err != nil {
return xerrors.Errorf("MaybeTakeHeavierTipSet failed in PutTipSet: %w", err)
}
return nil
}
// MaybeTakeHeavierTipSet evaluates the incoming tipset and locks it in our
// internal state as our new head, if and only if it is heavier than the current
// head and does not exceed the maximum fork length.
func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipSet) error {
// RefreshHeaviestTipSet receives a newTsHeight at which a new tipset might exist. It then:
// - "refreshes" the heaviest tipset that can be formed at its current heaviest height
// - if equivocation is detected among the miners of the current heaviest tipset, the head is immediately updated to the heaviest tipset that can be formed in a range of 5 epochs
//
// - forms the best tipset that can be formed at the _input_ height
// - compares the three tipset weights: "current" heaviest tipset, "refreshed" tipset, and best tipset at newTsHeight
// - updates "current" heaviest to the heaviest of those 3 tipsets (if an update is needed), assuming it doesn't violate the maximum fork rule
func (cs *ChainStore) RefreshHeaviestTipSet(ctx context.Context, newTsHeight abi.ChainEpoch) error {
for {
cs.heaviestLk.Lock()
if len(cs.reorgCh) < reorgChBuf/2 {
@ -426,39 +409,84 @@ func (cs *ChainStore) MaybeTakeHeavierTipSet(ctx context.Context, ts *types.TipS
defer cs.heaviestLk.Unlock()
if ts.Equals(cs.heaviest) {
heaviestWeight, err := cs.weight(ctx, cs.StateBlockstore(), cs.heaviest)
if err != nil {
return xerrors.Errorf("failed to calculate currentHeaviest's weight: %w", err)
}
heaviestHeight := abi.ChainEpoch(0)
if cs.heaviest != nil {
heaviestHeight = cs.heaviest.Height()
}
// Before we look at newTs, let's refresh best tipset at current head's height -- this is done to detect equivocation
newHeaviest, newHeaviestWeight, err := cs.FormHeaviestTipSetForHeight(ctx, heaviestHeight)
if err != nil {
return xerrors.Errorf("failed to reform head at same height: %w", err)
}
// Equivocation has occurred! We need a new head NOW!
if newHeaviest == nil || newHeaviestWeight.LessThan(heaviestWeight) {
log.Warnf("chainstore heaviest tipset's weight SHRANK from %d (%s) to %d (%s) due to equivocation", heaviestWeight, cs.heaviest, newHeaviestWeight, newHeaviest)
// refresh heaviestWeight 10 times moving up and down
for i := heaviestHeight + 5; i > heaviestHeight-5; i-- {
possibleHeaviestTs, possibleHeaviestWeight, err := cs.FormHeaviestTipSetForHeight(ctx, i)
if err != nil {
return xerrors.Errorf("failed to produce head at height %d: %w", i, err)
}
if possibleHeaviestWeight.GreaterThan(newHeaviestWeight) {
newHeaviestWeight = possibleHeaviestWeight
newHeaviest = possibleHeaviestTs
}
}
if newHeaviest == nil {
return xerrors.Errorf("failed to refresh to a new valid tipset")
}
errTake := cs.takeHeaviestTipSet(ctx, newHeaviest)
if errTake != nil {
return xerrors.Errorf("failed to take newHeaviest tipset as head: %w", err)
}
}
// if the new height we were notified about isn't what we just refreshed at, see if we have a heavier tipset there
if newTsHeight != newHeaviest.Height() {
bestTs, bestTsWeight, err := cs.FormHeaviestTipSetForHeight(ctx, newTsHeight)
if err != nil {
return xerrors.Errorf("failed to form new heaviest tipset at height %d: %w", newTsHeight, err)
}
heavier := bestTsWeight.GreaterThan(newHeaviestWeight)
if bestTsWeight.Equals(newHeaviestWeight) {
heavier = breakWeightTie(bestTs, newHeaviest)
}
if heavier {
newHeaviest = bestTs
}
}
// Everything's the same as before, exit early
if newHeaviest.Equals(cs.heaviest) {
return nil
}
w, err := cs.weight(ctx, cs.StateBlockstore(), ts)
// At this point, it MUST be true that newHeaviest is heavier than cs.heaviest -- update if fork allows
exceeds, err := cs.exceedsForkLength(ctx, cs.heaviest, newHeaviest)
if err != nil {
return err
return xerrors.Errorf("failed to check fork length: %w", err)
}
heaviestW, err := cs.weight(ctx, cs.StateBlockstore(), cs.heaviest)
if exceeds {
return nil
}
err = cs.takeHeaviestTipSet(ctx, newHeaviest)
if err != nil {
return err
}
heavier := w.GreaterThan(heaviestW)
if w.Equals(heaviestW) && !ts.Equals(cs.heaviest) {
log.Errorw("weight draw", "currTs", cs.heaviest, "ts", ts)
heavier = breakWeightTie(ts, cs.heaviest)
}
if heavier {
// TODO: don't do this for initial sync. Now that we don't have a
// difference between 'bootstrap sync' and 'caught up' sync, we need
// some other heuristic.
exceeds, err := cs.exceedsForkLength(ctx, cs.heaviest, ts)
if err != nil {
return err
}
if exceeds {
return nil
}
return cs.takeHeaviestTipSet(ctx, ts)
return xerrors.Errorf("failed to take heaviest tipset: %w", err)
}
return nil
@ -655,6 +683,16 @@ func (cs *ChainStore) takeHeaviestTipSet(ctx context.Context, ts *types.TipSet)
return err
}
// write the tipsetkey block to the blockstore for EthAPI queries
tsBlk, err := ts.Key().ToStorageBlock()
if err != nil {
return xerrors.Errorf("failed to get tipset key block: %w", err)
}
if err = cs.chainLocalBlockstore.Put(ctx, tsBlk); err != nil {
return xerrors.Errorf("failed to put tipset key block: %w", err)
}
if prevHeaviest != nil { // buf
if len(cs.reorgCh) > 0 {
log.Warnf("Reorg channel running behind, %d reorgs buffered", len(cs.reorgCh))
@ -975,6 +1013,7 @@ func (cs *ChainStore) AddToTipSetTracker(ctx context.Context, b *types.BlockHead
return nil
}
// PersistTipsets writes the provided blocks and the TipSetKey objects to the blockstore
func (cs *ChainStore) PersistTipsets(ctx context.Context, tipsets []*types.TipSet) error {
toPersist := make([]*types.BlockHeader, 0, len(tipsets)*int(build.BlocksPerEpoch))
tsBlks := make([]block.Block, 0, len(tipsets))
@ -1027,44 +1066,72 @@ func (cs *ChainStore) persistBlockHeaders(ctx context.Context, b ...*types.Block
return err
}
func (cs *ChainStore) expandTipset(ctx context.Context, b *types.BlockHeader) (*types.TipSet, error) {
// Hold lock for the whole function for now, if it becomes a problem we can
// fix pretty easily
// FormHeaviestTipSetForHeight looks up all valid blocks at a given height, and returns the heaviest tipset that can be made at that height
// It does not consider ANY blocks from miners that have "equivocated" (produced 2 blocks at the same height)
func (cs *ChainStore) FormHeaviestTipSetForHeight(ctx context.Context, height abi.ChainEpoch) (*types.TipSet, types.BigInt, error) {
cs.tstLk.Lock()
defer cs.tstLk.Unlock()
all := []*types.BlockHeader{b}
tsets, ok := cs.tipsets[b.Height]
blockCids, ok := cs.tipsets[height]
if !ok {
return types.NewTipSet(all)
return nil, types.NewInt(0), nil
}
inclMiners := map[address.Address]cid.Cid{b.Miner: b.Cid()}
for _, bhc := range tsets {
if bhc == b.Cid() {
continue
}
// First, identify "bad" miners for the height
seenMiners := map[address.Address]struct{}{}
badMiners := map[address.Address]struct{}{}
blocks := make([]*types.BlockHeader, 0, len(blockCids))
for _, bhc := range blockCids {
h, err := cs.GetBlock(ctx, bhc)
if err != nil {
return nil, xerrors.Errorf("failed to load block (%s) for tipset expansion: %w", bhc, err)
return nil, types.NewInt(0), xerrors.Errorf("failed to load block (%s) for tipset expansion: %w", bhc, err)
}
if cid, found := inclMiners[h.Miner]; found {
log.Warnf("Have multiple blocks from miner %s at height %d in our tipset cache %s-%s", h.Miner, h.Height, h.Cid(), cid)
if _, seen := seenMiners[h.Miner]; seen {
badMiners[h.Miner] = struct{}{}
continue
}
seenMiners[h.Miner] = struct{}{}
blocks = append(blocks, h)
}
if types.CidArrsEqual(h.Parents, b.Parents) {
all = append(all, h)
inclMiners[h.Miner] = bhc
// Next, group by parent tipset
formableTipsets := make(map[types.TipSetKey][]*types.BlockHeader, 0)
for _, h := range blocks {
if _, bad := badMiners[h.Miner]; bad {
continue
}
ptsk := types.NewTipSetKey(h.Parents...)
formableTipsets[ptsk] = append(formableTipsets[ptsk], h)
}
maxWeight := types.NewInt(0)
var maxTs *types.TipSet
for _, headers := range formableTipsets {
ts, err := types.NewTipSet(headers)
if err != nil {
return nil, types.NewInt(0), xerrors.Errorf("unexpected error forming tipset: %w", err)
}
weight, err := cs.Weight(ctx, ts)
if err != nil {
return nil, types.NewInt(0), xerrors.Errorf("failed to calculate weight: %w", err)
}
heavier := weight.GreaterThan(maxWeight)
if weight.Equals(maxWeight) {
heavier = breakWeightTie(ts, maxTs)
}
if heavier {
maxWeight = weight
maxTs = ts
}
}
// TODO: other validation...?
return types.NewTipSet(all)
return maxTs, maxWeight, nil
}
func (cs *ChainStore) GetGenesis(ctx context.Context) (*types.BlockHeader, error) {

View File

@ -10,6 +10,7 @@ import (
"github.com/ipfs/go-datastore"
"github.com/stretchr/testify/require"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/crypto"
@ -238,3 +239,94 @@ func TestChainExportImportFull(t *testing.T) {
}
}
}
func TestFormTipsetByHeight(t *testing.T) {
ctx := context.Background()
cg, err := gen.NewGenerator()
if err != nil {
t.Fatal(err)
}
var last *types.TipSet
for i := 0; i < 10; i++ {
ts, err := cg.NextTipSet()
if err != nil {
t.Fatal(err)
}
last = ts.TipSet.TipSet()
}
require.NotEmpty(t, last.Blocks())
blk1 := *last.Blocks()[0]
// quick check: asking to form tipset at latest height just returns head
bestHead, bestHeadWeight, err := cg.ChainStore().FormHeaviestTipSetForHeight(ctx, last.Height())
require.NoError(t, err)
require.Equal(t, last.Key(), bestHead.Key())
require.Contains(t, last.Cids(), blk1.Cid())
expectedWeight, err := cg.ChainStore().Weight(ctx, bestHead)
require.NoError(t, err)
require.Equal(t, expectedWeight, bestHeadWeight)
// add another block by a different miner -- it should get included in the best tipset
blk2 := blk1
blk1Miner, err := address.IDFromAddress(blk2.Miner)
require.NoError(t, err)
blk2.Miner, err = address.NewIDAddress(blk1Miner + 50)
require.NoError(t, err)
addBlockToTracker(t, cg.ChainStore(), &blk2)
bestHead, bestHeadWeight, err = cg.ChainStore().FormHeaviestTipSetForHeight(ctx, last.Height())
require.NoError(t, err)
for _, blkCid := range last.Cids() {
require.Contains(t, bestHead.Cids(), blkCid)
}
require.Contains(t, bestHead.Cids(), blk2.Cid())
expectedWeight, err = cg.ChainStore().Weight(ctx, bestHead)
require.NoError(t, err)
require.Equal(t, expectedWeight, bestHeadWeight)
// add another block by a different miner, but on a different tipset -- it should NOT get included
blk3 := blk1
blk3.Miner, err = address.NewIDAddress(blk1Miner + 100)
require.NoError(t, err)
blk3.Parents = append(blk3.Parents, blk1.Cid())
addBlockToTracker(t, cg.ChainStore(), &blk3)
bestHead, bestHeadWeight, err = cg.ChainStore().FormHeaviestTipSetForHeight(ctx, last.Height())
require.NoError(t, err)
for _, blkCid := range last.Cids() {
require.Contains(t, bestHead.Cids(), blkCid)
}
require.Contains(t, bestHead.Cids(), blk2.Cid())
require.NotContains(t, bestHead.Cids(), blk3.Cid())
expectedWeight, err = cg.ChainStore().Weight(ctx, bestHead)
require.NoError(t, err)
require.Equal(t, expectedWeight, bestHeadWeight)
// add another block by the same miner as blk1 -- it should NOT get included, and blk1 should be excluded too
blk4 := blk1
blk4.Timestamp = blk1.Timestamp + 1
addBlockToTracker(t, cg.ChainStore(), &blk4)
bestHead, bestHeadWeight, err = cg.ChainStore().FormHeaviestTipSetForHeight(ctx, last.Height())
require.NoError(t, err)
for _, blkCid := range last.Cids() {
if blkCid != blk1.Cid() {
require.Contains(t, bestHead.Cids(), blkCid)
}
}
require.NotContains(t, bestHead.Cids(), blk4.Cid())
require.NotContains(t, bestHead.Cids(), blk1.Cid())
expectedWeight, err = cg.ChainStore().Weight(ctx, bestHead)
require.NoError(t, err)
require.Equal(t, expectedWeight, bestHeadWeight)
}
func addBlockToTracker(t *testing.T, cs *store.ChainStore, blk *types.BlockHeader) {
blk2Ts, err := types.NewTipSet([]*types.BlockHeader{blk})
require.NoError(t, err)
require.NoError(t, cs.PersistTipsets(context.TODO(), []*types.TipSet{blk2Ts}))
require.NoError(t, cs.AddToTipSetTracker(context.TODO(), blk))
}

View File

@ -247,6 +247,7 @@ func (syncer *Syncer) InformNewHead(from peer.ID, fts *store.FullTipSet) bool {
return false
}
// TODO: this method name is a lie
syncer.syncmgr.SetPeerHead(ctx, from, fts.TipSet())
return true
}
@ -536,7 +537,7 @@ func (syncer *Syncer) Sync(ctx context.Context, maybeHead *types.TipSet) error {
// At this point we have accepted and synced to the new `maybeHead`
// (`StageSyncComplete`).
if err := syncer.store.PutTipSet(ctx, maybeHead); err != nil {
if err := syncer.store.RefreshHeaviestTipSet(ctx, maybeHead.Height()); err != nil {
span.AddAttributes(trace.StringAttribute("put_error", err.Error()))
span.SetStatus(trace.Status{
Code: 13,

View File

@ -311,7 +311,7 @@ func (tu *syncTestUtil) addSourceNode(gen int) {
for _, lastB := range lastTs.Blocks {
require.NoError(tu.t, cs.AddToTipSetTracker(context.Background(), lastB.Header))
}
err = cs.PutTipSet(tu.ctx, lastTs.TipSet())
err = cs.RefreshHeaviestTipSet(tu.ctx, lastTs.TipSet().Height())
require.NoError(tu.t, err)
tu.genesis = genesis