diff --git a/api/api_full.go b/api/api_full.go index 9d1d7ab63..034e8f996 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -157,10 +157,16 @@ type FullNode interface { // yet synced block headers. SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error) + // SyncCheckpoint marks a blocks as checkpointed, meaning that it won't ever fork away from it. + SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error + // SyncMarkBad marks a blocks as bad, meaning that it won't ever by synced. // Use with extreme caution. SyncMarkBad(ctx context.Context, bcid cid.Cid) error + // SyncUnmarkBad unmarks a blocks as bad, making it possible to be validated and synced again. + SyncUnmarkBad(ctx context.Context, bcid cid.Cid) error + // SyncCheckBad checks if a block was marked as bad, and if it was, returns // the reason. SyncCheckBad(ctx context.Context, bcid cid.Cid) (string, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 3cf9a0add..6361e60b3 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -105,7 +105,9 @@ type FullNodeStruct struct { SyncState func(context.Context) (*api.SyncState, error) `perm:"read"` SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"` SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"` + SyncCheckpoint func(ctx context.Context, key types.TipSetKey) error `perm:"admin"` SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"` + SyncUnmarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"` SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"` MpoolGetConfig func(context.Context) (*types.MpoolConfig, error) `perm:"read"` @@ -704,10 +706,18 @@ func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types. return c.Internal.SyncIncomingBlocks(ctx) } +func (c *FullNodeStruct) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error { + return c.Internal.SyncCheckpoint(ctx, tsk) +} + func (c *FullNodeStruct) SyncMarkBad(ctx context.Context, bcid cid.Cid) error { return c.Internal.SyncMarkBad(ctx, bcid) } +func (c *FullNodeStruct) SyncUnmarkBad(ctx context.Context, bcid cid.Cid) error { + return c.Internal.SyncUnmarkBad(ctx, bcid) +} + func (c *FullNodeStruct) SyncCheckBad(ctx context.Context, bcid cid.Cid) (string, error) { return c.Internal.SyncCheckBad(ctx, bcid) } diff --git a/chain/badtscache.go b/chain/badtscache.go index 103237307..3c5bf05ef 100644 --- a/chain/badtscache.go +++ b/chain/badtscache.go @@ -56,6 +56,10 @@ func (bts *BadBlockCache) Add(c cid.Cid, bbr BadBlockReason) { bts.badBlocks.Add(c, bbr) } +func (bts *BadBlockCache) Remove(c cid.Cid) { + bts.badBlocks.Remove(c) +} + func (bts *BadBlockCache) Has(c cid.Cid) (BadBlockReason, bool) { rval, ok := bts.badBlocks.Get(c) if !ok { diff --git a/chain/checkpoint.go b/chain/checkpoint.go new file mode 100644 index 000000000..8f99d73e4 --- /dev/null +++ b/chain/checkpoint.go @@ -0,0 +1,81 @@ +package chain + +import ( + "encoding/json" + + "github.com/filecoin-project/lotus/chain/types" + + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/ipfs/go-datastore" + "golang.org/x/xerrors" +) + +var CheckpointKey = datastore.NewKey("/chain/checks") + +func loadCheckpoint(ds dtypes.MetadataDS) (types.TipSetKey, error) { + haveChks, err := ds.Has(CheckpointKey) + if err != nil { + return types.EmptyTSK, err + } + + if !haveChks { + return types.EmptyTSK, nil + } + + tskBytes, err := ds.Get(CheckpointKey) + if err != nil { + return types.EmptyTSK, err + } + + var tsk types.TipSetKey + err = json.Unmarshal(tskBytes, &tsk) + if err != nil { + return types.EmptyTSK, err + } + + return tsk, err +} + +func (syncer *Syncer) SetCheckpoint(tsk types.TipSetKey) error { + if tsk == types.EmptyTSK { + return xerrors.Errorf("called with empty tsk") + } + + syncer.checkptLk.Lock() + defer syncer.checkptLk.Unlock() + + ts, err := syncer.ChainStore().LoadTipSet(tsk) + if err != nil { + return xerrors.Errorf("cannot find tipset: %w", err) + } + + hts := syncer.ChainStore().GetHeaviestTipSet() + anc, err := syncer.ChainStore().IsAncestorOf(ts, hts) + if err != nil { + return xerrors.Errorf("cannot determine whether checkpoint tipset is in main-chain: %w", err) + } + + if !hts.Equals(ts) && !anc { + return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err) + } + + tskBytes, err := json.Marshal(tsk) + if err != nil { + return err + } + + err = syncer.ds.Put(CheckpointKey, tskBytes) + if err != nil { + return err + } + + syncer.checkpt = tsk + + return nil +} + +func (syncer *Syncer) GetCheckpoint() types.TipSetKey { + syncer.checkptLk.Lock() + defer syncer.checkptLk.Unlock() + return syncer.checkpt +} diff --git a/chain/sync.go b/chain/sync.go index d2cf08b92..d64e68055 100644 --- a/chain/sync.go +++ b/chain/sync.go @@ -9,8 +9,11 @@ import ( "sort" "strconv" "strings" + "sync" "time" + "github.com/filecoin-project/lotus/node/modules/dtypes" + "github.com/filecoin-project/specs-actors/actors/runtime/proof" "github.com/Gurpartap/async" @@ -129,10 +132,16 @@ type Syncer struct { windowSize int tickerCtxCancel context.CancelFunc + + checkptLk sync.Mutex + + checkpt types.TipSetKey + + ds dtypes.MetadataDS } // NewSyncer creates a new Syncer object. -func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { +func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) { gen, err := sm.ChainStore().GetGenesis() if err != nil { return nil, xerrors.Errorf("getting genesis block: %w", err) @@ -143,7 +152,14 @@ func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr return nil, err } + cp, err := loadCheckpoint(ds) + if err != nil { + return nil, xerrors.Errorf("error loading mpool config: %w", err) + } + s := &Syncer{ + ds: ds, + checkpt: cp, beacon: beacon, bad: NewBadBlockCache(), Genesis: gent, @@ -1361,7 +1377,7 @@ loop: log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height()) fork, err := syncer.syncFork(ctx, base, known) if err != nil { - if xerrors.Is(err, ErrForkTooLong) { + if xerrors.Is(err, ErrForkTooLong) || xerrors.Is(err, ErrForkCheckpoint) { // TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish? log.Warn("adding forked chain to our bad tipset cache") for _, b := range incoming.Blocks() { @@ -1377,14 +1393,23 @@ loop: } var ErrForkTooLong = fmt.Errorf("fork longer than threshold") +var ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block") // syncFork tries to obtain the chain fragment that links a fork into a common // ancestor in our view of the chain. // -// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the -// denylist. Else, we find the common ancestor, and add the missing chain +// If the fork is too long (build.ForkLengthThreshold), or would cause us to diverge from the checkpoint (ErrForkCheckpoint), +// we add the entire subchain to the denylist. Else, we find the common ancestor, and add the missing chain // fragment until the fork point to the returned []TipSet. func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) { + + chkpt := syncer.GetCheckpoint() + if known.Key() == chkpt { + return nil, ErrForkCheckpoint + } + + // TODO: Does this mean we always ask for ForkLengthThreshold blocks from the network, even if we just need, like, 2? + // Would it not be better to ask in smaller chunks, given that an ~ForkLengthThreshold is very rare? tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold)) if err != nil { return nil, err @@ -1410,12 +1435,18 @@ func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, know if nts.Height() < tips[cur].Height() { cur++ } else { + // We will be forking away from nts, check that it isn't checkpointed + if nts.Key() == chkpt { + return nil, ErrForkCheckpoint + } + nts, err = syncer.store.LoadTipSet(nts.Parents()) if err != nil { return nil, xerrors.Errorf("loading next local tipset: %w", err) } } } + return nil, ErrForkTooLong } @@ -1644,6 +1675,11 @@ func (syncer *Syncer) MarkBad(blk cid.Cid) { syncer.bad.Add(blk, NewBadBlockReason([]cid.Cid{blk}, "manually marked bad")) } +// UnmarkBad manually adds a block to the "bad blocks" cache. +func (syncer *Syncer) UnmarkBad(blk cid.Cid) { + syncer.bad.Remove(blk) +} + func (syncer *Syncer) CheckBadBlockCache(blk cid.Cid) (string, bool) { bbr, ok := syncer.bad.Has(blk) return bbr.String(), ok diff --git a/chain/sync_test.go b/chain/sync_test.go index f91929a02..92873118a 100644 --- a/chain/sync_test.go +++ b/chain/sync_test.go @@ -333,6 +333,36 @@ func (tu *syncTestUtil) compareSourceState(with int) { } } +func (tu *syncTestUtil) assertBad(node int, ts *types.TipSet) { + for _, blk := range ts.Cids() { + rsn, err := tu.nds[node].SyncCheckBad(context.TODO(), blk) + require.NoError(tu.t, err) + require.True(tu.t, len(rsn) != 0) + } +} + +func (tu *syncTestUtil) getHead(node int) *types.TipSet { + ts, err := tu.nds[node].ChainHead(context.TODO()) + require.NoError(tu.t, err) + return ts +} + +func (tu *syncTestUtil) checkpointTs(node int, tsk types.TipSetKey) { + require.NoError(tu.t, tu.nds[node].SyncCheckpoint(context.TODO(), tsk)) +} + +func (tu *syncTestUtil) waitUntilNodeHasTs(node int, tsk types.TipSetKey) { + for { + _, err := tu.nds[node].ChainGetTipSet(context.TODO(), tsk) + if err != nil { + break + } + } + + // Time to allow for syncing and validation + time.Sleep(2 * time.Second) +} + func (tu *syncTestUtil) waitUntilSync(from, to int) { target, err := tu.nds[from].ChainHead(tu.ctx) if err != nil { @@ -678,3 +708,45 @@ func TestSyncInputs(t *testing.T) { t.Fatal("should error on block with nil election proof") } } + +func TestSyncCheckpoint(t *testing.T) { + H := 10 + tu := prepSyncTest(t, H) + + p1 := tu.addClientNode() + p2 := tu.addClientNode() + + fmt.Println("GENESIS: ", tu.g.Genesis().Cid()) + tu.loadChainToNode(p1) + tu.loadChainToNode(p2) + + base := tu.g.CurTipset + fmt.Println("Mining base: ", base.TipSet().Cids(), base.TipSet().Height()) + + // The two nodes fork at this point into 'a' and 'b' + a1 := tu.mineOnBlock(base, p1, []int{0}, true, false, nil) + a := tu.mineOnBlock(a1, p1, []int{0}, true, false, nil) + a = tu.mineOnBlock(a, p1, []int{0}, true, false, nil) + + tu.waitUntilSyncTarget(p1, a.TipSet()) + tu.checkpointTs(p1, a.TipSet().Key()) + + require.NoError(t, tu.g.ResyncBankerNonce(a1.TipSet())) + // chain B will now be heaviest + b := tu.mineOnBlock(base, p2, []int{1}, true, false, nil) + b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil) + b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil) + b = tu.mineOnBlock(b, p2, []int{1}, true, false, nil) + + fmt.Println("A: ", a.Cids(), a.TipSet().Height()) + fmt.Println("B: ", b.Cids(), b.TipSet().Height()) + + // Now for the fun part!! p1 should mark p2's head as BAD. + + require.NoError(t, tu.mn.LinkAll()) + tu.connect(p1, p2) + tu.waitUntilNodeHasTs(p1, b.TipSet().Key()) + p1Head := tu.getHead(p1) + require.Equal(tu.t, p1Head, a.TipSet()) + tu.assertBad(p1, b.TipSet()) +} diff --git a/cli/chain.go b/cli/chain.go index d00247364..ce1660641 100644 --- a/cli/chain.go +++ b/cli/chain.go @@ -337,25 +337,6 @@ var chainSetHeadCmd = &cli.Command{ }, } -func parseTipSet(ctx context.Context, api api.FullNode, vals []string) (*types.TipSet, error) { - var headers []*types.BlockHeader - for _, c := range vals { - blkc, err := cid.Decode(c) - if err != nil { - return nil, err - } - - bh, err := api.ChainGetBlock(ctx, blkc) - if err != nil { - return nil, err - } - - headers = append(headers, bh) - } - - return types.NewTipSet(headers) -} - var chainListCmd = &cli.Command{ Name: "list", Usage: "View a segment of the chain", diff --git a/cli/sync.go b/cli/sync.go index a92cd9437..bff34960e 100644 --- a/cli/sync.go +++ b/cli/sync.go @@ -5,6 +5,8 @@ import ( "fmt" "time" + "github.com/filecoin-project/lotus/chain/types" + "github.com/filecoin-project/go-state-types/abi" cid "github.com/ipfs/go-cid" "github.com/urfave/cli/v2" @@ -20,7 +22,9 @@ var syncCmd = &cli.Command{ syncStatusCmd, syncWaitCmd, syncMarkBadCmd, + syncUnmarkBadCmd, syncCheckBadCmd, + syncCheckpointCmd, }, } @@ -117,6 +121,31 @@ var syncMarkBadCmd = &cli.Command{ }, } +var syncUnmarkBadCmd = &cli.Command{ + Name: "unmark-bad", + Usage: "Unmark the given block as bad, makes it possible to sync to a chain containing it", + ArgsUsage: "[blockCid]", + Action: func(cctx *cli.Context) error { + napi, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + if !cctx.Args().Present() { + return fmt.Errorf("must specify block cid to unmark") + } + + bcid, err := cid.Decode(cctx.Args().First()) + if err != nil { + return fmt.Errorf("failed to decode input as a cid: %s", err) + } + + return napi.SyncUnmarkBad(ctx, bcid) + }, +} + var syncCheckBadCmd = &cli.Command{ Name: "check-bad", Usage: "check if the given block was marked bad, and for what reason", @@ -153,6 +182,48 @@ var syncCheckBadCmd = &cli.Command{ }, } +var syncCheckpointCmd = &cli.Command{ + Name: "checkpoint", + Usage: "mark a certain tipset as checkpointed; the node will never fork away from this tipset", + ArgsUsage: "[tipsetKey]", + Flags: []cli.Flag{ + &cli.Uint64Flag{ + Name: "epoch", + Usage: "checkpoint the tipset at the given epoch", + }, + }, + Action: func(cctx *cli.Context) error { + napi, closer, err := GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := ReqContext(cctx) + + var ts *types.TipSet + + if cctx.IsSet("epoch") { + ts, err = napi.ChainGetTipSetByHeight(ctx, abi.ChainEpoch(cctx.Uint64("epoch")), types.EmptyTSK) + } + if ts == nil { + ts, err = parseTipSet(ctx, napi, cctx.Args().Slice()) + } + if err != nil { + return err + } + + if ts == nil { + return fmt.Errorf("must pass cids for tipset to set as head, or specify epoch flag") + } + + if err := napi.SyncCheckpoint(ctx, ts.Key()); err != nil { + return err + } + + return nil + }, +} + func SyncWait(ctx context.Context, napi api.FullNode) error { for { state, err := napi.SyncState(ctx) diff --git a/cli/util.go b/cli/util.go new file mode 100644 index 000000000..4371f8bbc --- /dev/null +++ b/cli/util.go @@ -0,0 +1,28 @@ +package cli + +import ( + "context" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + "github.com/ipfs/go-cid" +) + +func parseTipSet(ctx context.Context, api api.FullNode, vals []string) (*types.TipSet, error) { + var headers []*types.BlockHeader + for _, c := range vals { + blkc, err := cid.Decode(c) + if err != nil { + return nil, err + } + + bh, err := api.ChainGetBlock(ctx, blkc) + if err != nil { + return nil, err + } + + headers = append(headers, bh) + } + + return types.NewTipSet(headers) +} diff --git a/documentation/en/api-methods.md b/documentation/en/api-methods.md index 64f0d272a..10b870e14 100644 --- a/documentation/en/api-methods.md +++ b/documentation/en/api-methods.md @@ -156,10 +156,12 @@ * [StateWaitMsg](#StateWaitMsg) * [Sync](#Sync) * [SyncCheckBad](#SyncCheckBad) + * [SyncCheckpoint](#SyncCheckpoint) * [SyncIncomingBlocks](#SyncIncomingBlocks) * [SyncMarkBad](#SyncMarkBad) * [SyncState](#SyncState) * [SyncSubmitBlock](#SyncSubmitBlock) + * [SyncUnmarkBad](#SyncUnmarkBad) * [Wallet](#Wallet) * [WalletBalance](#WalletBalance) * [WalletDefaultAddress](#WalletDefaultAddress) @@ -3995,6 +3997,28 @@ Inputs: Response: `"string value"` +### SyncCheckpoint +SyncCheckpoint marks a blocks as checkpointed, meaning that it won't ever fork away from it. + + +Perms: admin + +Inputs: +```json +[ + [ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + }, + { + "/": "bafy2bzacebp3shtrn43k7g3unredz7fxn4gj533d3o43tqn2p2ipxxhrvchve" + } + ] +] +``` + +Response: `{}` + ### SyncIncomingBlocks SyncIncomingBlocks returns a channel streaming incoming, potentially not yet synced block headers. @@ -4130,6 +4154,23 @@ Inputs: Response: `{}` +### SyncUnmarkBad +SyncUnmarkBad unmarks a blocks as bad, making it possible to be validated and synced again. + + +Perms: admin + +Inputs: +```json +[ + { + "/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4" + } +] +``` + +Response: `{}` + ## Wallet diff --git a/documentation/en/architecture.md b/documentation/en/architecture.md index 619e04f05..ca4789fa0 100644 --- a/documentation/en/architecture.md +++ b/documentation/en/architecture.md @@ -259,7 +259,7 @@ When we launch a Lotus node with the command `./lotus daemon` (see [here](https://github.com/filecoin-project/lotus/blob/master/cmd/lotus/daemon.go) for more), the node is created through [dependency injection](https://godoc.org/go.uber.org/fx). This relies on reflection, which makes some of the references hard to follow. -The node sets up all of the subsystems it needs to run, such as the repository, the network connections, thechain sync +The node sets up all of the subsystems it needs to run, such as the repository, the network connections, the chain sync service, etc. This setup is orchestrated through calls to the `node.Override` function. The structure of each call indicates the type of component it will set up diff --git a/node/impl/full/sync.go b/node/impl/full/sync.go index 9066df56f..7f7fd48be 100644 --- a/node/impl/full/sync.go +++ b/node/impl/full/sync.go @@ -97,12 +97,23 @@ func (a *SyncAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHe return a.Syncer.IncomingBlocks(ctx) } +func (a *SyncAPI) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error { + log.Warnf("Marking tipset %s as bad", tsk) + return a.Syncer.SetCheckpoint(tsk) +} + func (a *SyncAPI) SyncMarkBad(ctx context.Context, bcid cid.Cid) error { log.Warnf("Marking block %s as bad", bcid) a.Syncer.MarkBad(bcid) return nil } +func (a *SyncAPI) SyncUnmarkBad(ctx context.Context, bcid cid.Cid) error { + log.Warnf("Unmarking block %s as bad", bcid) + a.Syncer.UnmarkBad(bcid) + return nil +} + func (a *SyncAPI) SyncCheckBad(ctx context.Context, bcid cid.Cid) (string, error) { reason, ok := a.Syncer.CheckBadBlockCache(bcid) if !ok { diff --git a/node/modules/chain.go b/node/modules/chain.go index 1f398d0d8..cc86156b6 100644 --- a/node/modules/chain.go +++ b/node/modules/chain.go @@ -163,8 +163,8 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore, return netName, err } -func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { - syncer, err := chain.NewSyncer(sm, exchange, h.ConnManager(), h.ID(), beacon, verifier) +func NewSyncer(lc fx.Lifecycle, ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) { + syncer, err := chain.NewSyncer(ds, sm, exchange, h.ConnManager(), h.ID(), beacon, verifier) if err != nil { return nil, err }