Allow marking a certain tipset as checkpointed
This commit is contained in:
parent
a2f1e76fea
commit
35aa78dad9
@ -157,6 +157,9 @@ type FullNode interface {
|
|||||||
// yet synced block headers.
|
// yet synced block headers.
|
||||||
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error)
|
SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHeader, error)
|
||||||
|
|
||||||
|
// SyncCheckpoint marks a blocks as checkpointed, meaning that it won't ever fork away from it.
|
||||||
|
SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error
|
||||||
|
|
||||||
// SyncMarkBad marks a blocks as bad, meaning that it won't ever by synced.
|
// SyncMarkBad marks a blocks as bad, meaning that it won't ever by synced.
|
||||||
// Use with extreme caution.
|
// Use with extreme caution.
|
||||||
SyncMarkBad(ctx context.Context, bcid cid.Cid) error
|
SyncMarkBad(ctx context.Context, bcid cid.Cid) error
|
||||||
|
@ -105,6 +105,7 @@ type FullNodeStruct struct {
|
|||||||
SyncState func(context.Context) (*api.SyncState, error) `perm:"read"`
|
SyncState func(context.Context) (*api.SyncState, error) `perm:"read"`
|
||||||
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
|
SyncSubmitBlock func(ctx context.Context, blk *types.BlockMsg) error `perm:"write"`
|
||||||
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
|
SyncIncomingBlocks func(ctx context.Context) (<-chan *types.BlockHeader, error) `perm:"read"`
|
||||||
|
SyncCheckpoint func(ctx context.Context, key types.TipSetKey) error `perm:"admin"`
|
||||||
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
|
SyncMarkBad func(ctx context.Context, bcid cid.Cid) error `perm:"admin"`
|
||||||
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`
|
SyncCheckBad func(ctx context.Context, bcid cid.Cid) (string, error) `perm:"read"`
|
||||||
|
|
||||||
@ -704,6 +705,10 @@ func (c *FullNodeStruct) SyncIncomingBlocks(ctx context.Context) (<-chan *types.
|
|||||||
return c.Internal.SyncIncomingBlocks(ctx)
|
return c.Internal.SyncIncomingBlocks(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *FullNodeStruct) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error {
|
||||||
|
return c.Internal.SyncCheckpoint(ctx, tsk)
|
||||||
|
}
|
||||||
|
|
||||||
func (c *FullNodeStruct) SyncMarkBad(ctx context.Context, bcid cid.Cid) error {
|
func (c *FullNodeStruct) SyncMarkBad(ctx context.Context, bcid cid.Cid) error {
|
||||||
return c.Internal.SyncMarkBad(ctx, bcid)
|
return c.Internal.SyncMarkBad(ctx, bcid)
|
||||||
}
|
}
|
||||||
|
81
chain/checkpoint.go
Normal file
81
chain/checkpoint.go
Normal file
@ -0,0 +1,81 @@
|
|||||||
|
package chain
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
"github.com/ipfs/go-datastore"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
)
|
||||||
|
|
||||||
|
var CheckpointKey = datastore.NewKey("/chain/checks")
|
||||||
|
|
||||||
|
func loadCheckpoint(ds dtypes.MetadataDS) (types.TipSetKey, error) {
|
||||||
|
haveChks, err := ds.Has(CheckpointKey)
|
||||||
|
if err != nil {
|
||||||
|
return types.EmptyTSK, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !haveChks {
|
||||||
|
return types.EmptyTSK, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
tskBytes, err := ds.Get(CheckpointKey)
|
||||||
|
if err != nil {
|
||||||
|
return types.EmptyTSK, err
|
||||||
|
}
|
||||||
|
|
||||||
|
var tsk types.TipSetKey
|
||||||
|
err = json.Unmarshal(tskBytes, &tsk)
|
||||||
|
if err != nil {
|
||||||
|
return types.EmptyTSK, err
|
||||||
|
}
|
||||||
|
|
||||||
|
return tsk, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) SetCheckpoint(tsk types.TipSetKey) error {
|
||||||
|
if tsk == types.EmptyTSK {
|
||||||
|
return xerrors.Errorf("called with empty tsk")
|
||||||
|
}
|
||||||
|
|
||||||
|
syncer.checkptLk.Lock()
|
||||||
|
defer syncer.checkptLk.Unlock()
|
||||||
|
|
||||||
|
ts, err := syncer.ChainStore().LoadTipSet(tsk)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("cannot find tipset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
hts := syncer.ChainStore().GetHeaviestTipSet()
|
||||||
|
anc, err := syncer.ChainStore().IsAncestorOf(ts, hts)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("cannot determine whether checkpoint tipset is in main-chain: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !hts.Equals(ts) && !anc {
|
||||||
|
return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
tskBytes, err := json.Marshal(tsk)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
err = syncer.ds.Put(CheckpointKey, tskBytes)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
syncer.checkpt = tsk
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (syncer *Syncer) GetCheckpoint() types.TipSetKey {
|
||||||
|
syncer.checkptLk.Lock()
|
||||||
|
defer syncer.checkptLk.Unlock()
|
||||||
|
return syncer.checkpt
|
||||||
|
}
|
@ -9,8 +9,11 @@ import (
|
|||||||
"sort"
|
"sort"
|
||||||
"strconv"
|
"strconv"
|
||||||
"strings"
|
"strings"
|
||||||
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||||
|
|
||||||
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
|
"github.com/filecoin-project/specs-actors/actors/runtime/proof"
|
||||||
|
|
||||||
"github.com/Gurpartap/async"
|
"github.com/Gurpartap/async"
|
||||||
@ -129,10 +132,16 @@ type Syncer struct {
|
|||||||
windowSize int
|
windowSize int
|
||||||
|
|
||||||
tickerCtxCancel context.CancelFunc
|
tickerCtxCancel context.CancelFunc
|
||||||
|
|
||||||
|
checkptLk sync.Mutex
|
||||||
|
|
||||||
|
checkpt types.TipSetKey
|
||||||
|
|
||||||
|
ds dtypes.MetadataDS
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewSyncer creates a new Syncer object.
|
// NewSyncer creates a new Syncer object.
|
||||||
func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
|
func NewSyncer(ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr.ConnManager, self peer.ID, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*Syncer, error) {
|
||||||
gen, err := sm.ChainStore().GetGenesis()
|
gen, err := sm.ChainStore().GetGenesis()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, xerrors.Errorf("getting genesis block: %w", err)
|
return nil, xerrors.Errorf("getting genesis block: %w", err)
|
||||||
@ -143,7 +152,14 @@ func NewSyncer(sm *stmgr.StateManager, exchange exchange.Client, connmgr connmgr
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
cp, err := loadCheckpoint(ds)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("error loading mpool config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
s := &Syncer{
|
s := &Syncer{
|
||||||
|
ds: ds,
|
||||||
|
checkpt: cp,
|
||||||
beacon: beacon,
|
beacon: beacon,
|
||||||
bad: NewBadBlockCache(),
|
bad: NewBadBlockCache(),
|
||||||
Genesis: gent,
|
Genesis: gent,
|
||||||
@ -1361,7 +1377,7 @@ loop:
|
|||||||
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
|
log.Warnf("(fork detected) synced header chain (%s - %d) does not link to our best block (%s - %d)", incoming.Cids(), incoming.Height(), known.Cids(), known.Height())
|
||||||
fork, err := syncer.syncFork(ctx, base, known)
|
fork, err := syncer.syncFork(ctx, base, known)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if xerrors.Is(err, ErrForkTooLong) {
|
if xerrors.Is(err, ErrForkTooLong) || xerrors.Is(err, ErrForkCheckpoint) {
|
||||||
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
|
// TODO: we're marking this block bad in the same way that we mark invalid blocks bad. Maybe distinguish?
|
||||||
log.Warn("adding forked chain to our bad tipset cache")
|
log.Warn("adding forked chain to our bad tipset cache")
|
||||||
for _, b := range incoming.Blocks() {
|
for _, b := range incoming.Blocks() {
|
||||||
@ -1377,14 +1393,17 @@ loop:
|
|||||||
}
|
}
|
||||||
|
|
||||||
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
var ErrForkTooLong = fmt.Errorf("fork longer than threshold")
|
||||||
|
var ErrForkCheckpoint = fmt.Errorf("fork would require us to diverge from checkpointed block")
|
||||||
|
|
||||||
// syncFork tries to obtain the chain fragment that links a fork into a common
|
// syncFork tries to obtain the chain fragment that links a fork into a common
|
||||||
// ancestor in our view of the chain.
|
// ancestor in our view of the chain.
|
||||||
//
|
//
|
||||||
// If the fork is too long (build.ForkLengthThreshold), we add the entire subchain to the
|
// If the fork is too long (build.ForkLengthThreshold), or would cause us to diverge from the checkpoint (ErrForkCheckpoint),
|
||||||
// denylist. Else, we find the common ancestor, and add the missing chain
|
// we add the entire subchain to the denylist. Else, we find the common ancestor, and add the missing chain
|
||||||
// fragment until the fork point to the returned []TipSet.
|
// fragment until the fork point to the returned []TipSet.
|
||||||
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
|
func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, known *types.TipSet) ([]*types.TipSet, error) {
|
||||||
|
// TODO: Does this mean we always ask for ForkLengthThreshold blocks from the network, even if we just need, like, 2?
|
||||||
|
// Would it not be better to ask in smaller chunks, given that an ~ForkLengthThreshold is very rare?
|
||||||
tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
|
tips, err := syncer.Exchange.GetBlocks(ctx, incoming.Parents(), int(build.ForkLengthThreshold))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
@ -1404,6 +1423,22 @@ func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, know
|
|||||||
}
|
}
|
||||||
|
|
||||||
if nts.Equals(tips[cur]) {
|
if nts.Equals(tips[cur]) {
|
||||||
|
// We've identified the ancestor
|
||||||
|
// Still need to make sure this wouldn't cause us to fork away from the checkpointed tipset
|
||||||
|
|
||||||
|
chkpt := syncer.GetCheckpoint()
|
||||||
|
|
||||||
|
if chkpt != types.EmptyTSK {
|
||||||
|
chkTs, err := syncer.ChainStore().LoadTipSet(chkpt)
|
||||||
|
if err != nil {
|
||||||
|
return nil, xerrors.Errorf("failed to retrieve checkpoint tipset: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if chkTs.Height() > nts.Height() {
|
||||||
|
return nil, ErrForkCheckpoint
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return tips[:cur+1], nil
|
return tips[:cur+1], nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1416,6 +1451,7 @@ func (syncer *Syncer) syncFork(ctx context.Context, incoming *types.TipSet, know
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, ErrForkTooLong
|
return nil, ErrForkTooLong
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -97,6 +97,11 @@ func (a *SyncAPI) SyncIncomingBlocks(ctx context.Context) (<-chan *types.BlockHe
|
|||||||
return a.Syncer.IncomingBlocks(ctx)
|
return a.Syncer.IncomingBlocks(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (a *SyncAPI) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) error {
|
||||||
|
log.Warnf("Marking tipset %s as bad", tsk)
|
||||||
|
return a.Syncer.SetCheckpoint(tsk)
|
||||||
|
}
|
||||||
|
|
||||||
func (a *SyncAPI) SyncMarkBad(ctx context.Context, bcid cid.Cid) error {
|
func (a *SyncAPI) SyncMarkBad(ctx context.Context, bcid cid.Cid) error {
|
||||||
log.Warnf("Marking block %s as bad", bcid)
|
log.Warnf("Marking block %s as bad", bcid)
|
||||||
a.Syncer.MarkBad(bcid)
|
a.Syncer.MarkBad(bcid)
|
||||||
|
@ -163,8 +163,8 @@ func NetworkName(mctx helpers.MetricsCtx, lc fx.Lifecycle, cs *store.ChainStore,
|
|||||||
return netName, err
|
return netName, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func NewSyncer(lc fx.Lifecycle, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) {
|
func NewSyncer(lc fx.Lifecycle, ds dtypes.MetadataDS, sm *stmgr.StateManager, exchange exchange.Client, h host.Host, beacon beacon.RandomBeacon, verifier ffiwrapper.Verifier) (*chain.Syncer, error) {
|
||||||
syncer, err := chain.NewSyncer(sm, exchange, h.ConnManager(), h.ID(), beacon, verifier)
|
syncer, err := chain.NewSyncer(ds, sm, exchange, h.ConnManager(), h.ID(), beacon, verifier)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user