fix: sync: atomically switch chains when checkpointing (#11595)
We can now atomically switch chains when checkpointing. Previously, we'd call `SetHead` followed by `SetCheckpoint`. Unfortunately, that's not atomic and the "head" could have reverted before we called `SetCheckpoint` (causing the latter to fail). Now, we just call `SetCheckpoint` and let `SetCheckpoint` adjust our head. This changes the behavior of `ChainStore.SetCheckpoint`, but `Syncer.SyncCheckpoint` is the only caller anyways.
This commit is contained in:
parent
057cef5b05
commit
6cbeb9aad6
@ -24,8 +24,15 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e
|
|||||||
ts = tss[0]
|
ts = tss[0]
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.switchChain(ctx, ts); err != nil {
|
hts := syncer.ChainStore().GetHeaviestTipSet()
|
||||||
return xerrors.Errorf("failed to switch chain when syncing checkpoint: %w", err)
|
if hts.Equals(ts) {
|
||||||
|
// Current head, no need to switch.
|
||||||
|
} else if anc, err := syncer.store.IsAncestorOf(ctx, ts, hts); err != nil {
|
||||||
|
return xerrors.Errorf("failed to walk the chain when checkpointing: %w", err)
|
||||||
|
} else if anc {
|
||||||
|
// New checkpoint is on the current chain, we definitely have the tipsets.
|
||||||
|
} else if err := syncer.collectChain(ctx, ts, hts, true); err != nil {
|
||||||
|
return xerrors.Errorf("failed to collect chain for checkpoint: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := syncer.ChainStore().SetCheckpoint(ctx, ts); err != nil {
|
if err := syncer.ChainStore().SetCheckpoint(ctx, ts); err != nil {
|
||||||
@ -34,24 +41,3 @@ func (syncer *Syncer) SyncCheckpoint(ctx context.Context, tsk types.TipSetKey) e
|
|||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (syncer *Syncer) switchChain(ctx context.Context, ts *types.TipSet) error {
|
|
||||||
hts := syncer.ChainStore().GetHeaviestTipSet()
|
|
||||||
if hts.Equals(ts) {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
if anc, err := syncer.store.IsAncestorOf(ctx, ts, hts); err == nil && anc {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, sync the chain and set the head.
|
|
||||||
if err := syncer.collectChain(ctx, ts, hts, true); err != nil {
|
|
||||||
return xerrors.Errorf("failed to collect chain for checkpoint: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := syncer.ChainStore().SetHead(ctx, ts); err != nil {
|
|
||||||
return xerrors.Errorf("failed to set the chain head: %w", err)
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
@ -44,22 +44,14 @@ func TestChainCheckpoint(t *testing.T) {
|
|||||||
head := cs.GetHeaviestTipSet()
|
head := cs.GetHeaviestTipSet()
|
||||||
require.True(t, head.Equals(checkpointParents))
|
require.True(t, head.Equals(checkpointParents))
|
||||||
|
|
||||||
// Try to set the checkpoint in the future, it should fail.
|
// Checkpoint into the future.
|
||||||
err = cs.SetCheckpoint(ctx, checkpoint)
|
err = cs.SetCheckpoint(ctx, checkpoint)
|
||||||
require.Error(t, err)
|
|
||||||
|
|
||||||
// Then move the head back.
|
|
||||||
err = cs.SetHead(ctx, checkpoint)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
// Verify it worked.
|
// And verify that it worked.
|
||||||
head = cs.GetHeaviestTipSet()
|
head = cs.GetHeaviestTipSet()
|
||||||
require.True(t, head.Equals(checkpoint))
|
require.True(t, head.Equals(checkpoint))
|
||||||
|
|
||||||
// And checkpoint it.
|
|
||||||
err = cs.SetCheckpoint(ctx, checkpoint)
|
|
||||||
require.NoError(t, err)
|
|
||||||
|
|
||||||
// Let the second miner miner mine a fork
|
// Let the second miner miner mine a fork
|
||||||
last = checkpointParents
|
last = checkpointParents
|
||||||
for i := 0; i < 4; i++ {
|
for i := 0; i < 4; i++ {
|
||||||
@ -85,11 +77,10 @@ func TestChainCheckpoint(t *testing.T) {
|
|||||||
head = cs.GetHeaviestTipSet()
|
head = cs.GetHeaviestTipSet()
|
||||||
require.True(t, head.Equals(last))
|
require.True(t, head.Equals(last))
|
||||||
|
|
||||||
// Setting a checkpoint on the other fork should fail.
|
// We should switch back if we checkpoint again.
|
||||||
err = cs.SetCheckpoint(ctx, checkpoint)
|
err = cs.SetCheckpoint(ctx, checkpoint)
|
||||||
require.Error(t, err)
|
|
||||||
|
|
||||||
// Setting a checkpoint on this fork should succeed.
|
|
||||||
err = cs.SetCheckpoint(ctx, checkpointParents)
|
|
||||||
require.NoError(t, err)
|
require.NoError(t, err)
|
||||||
|
|
||||||
|
head = cs.GetHeaviestTipSet()
|
||||||
|
require.True(t, head.Equals(checkpoint))
|
||||||
}
|
}
|
||||||
|
@ -793,9 +793,12 @@ func (cs *ChainStore) removeCheckpoint(ctx context.Context) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// SetCheckpoint will set a checkpoint past which the chainstore will not allow forks.
|
// SetCheckpoint will set a checkpoint past which the chainstore will not allow forks. If the new
|
||||||
|
// checkpoint is not an ancestor of the current head, head will be set to the new checkpoint.
|
||||||
//
|
//
|
||||||
// NOTE: Checkpoints cannot be set beyond ForkLengthThreshold epochs in the past.
|
// NOTE: Checkpoints cannot be set beyond ForkLengthThreshold epochs in the past, but can be set
|
||||||
|
// arbitrarily far into the future.
|
||||||
|
// NOTE: The new checkpoint must already be synced.
|
||||||
func (cs *ChainStore) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
|
func (cs *ChainStore) SetCheckpoint(ctx context.Context, ts *types.TipSet) error {
|
||||||
tskBytes, err := json.Marshal(ts.Key())
|
tskBytes, err := json.Marshal(ts.Key())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -805,10 +808,6 @@ func (cs *ChainStore) SetCheckpoint(ctx context.Context, ts *types.TipSet) error
|
|||||||
cs.heaviestLk.Lock()
|
cs.heaviestLk.Lock()
|
||||||
defer cs.heaviestLk.Unlock()
|
defer cs.heaviestLk.Unlock()
|
||||||
|
|
||||||
if ts.Height() > cs.heaviest.Height() {
|
|
||||||
return xerrors.Errorf("cannot set a checkpoint in the future")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Otherwise, this operation could get _very_ expensive.
|
// Otherwise, this operation could get _very_ expensive.
|
||||||
if cs.heaviest.Height()-ts.Height() > build.ForkLengthThreshold {
|
if cs.heaviest.Height()-ts.Height() > build.ForkLengthThreshold {
|
||||||
return xerrors.Errorf("cannot set a checkpoint before the fork threshold")
|
return xerrors.Errorf("cannot set a checkpoint before the fork threshold")
|
||||||
@ -821,7 +820,9 @@ func (cs *ChainStore) SetCheckpoint(ctx context.Context, ts *types.TipSet) error
|
|||||||
}
|
}
|
||||||
|
|
||||||
if !anc {
|
if !anc {
|
||||||
return xerrors.Errorf("cannot mark tipset as checkpoint, since it isn't in the main-chain: %w", err)
|
if err := cs.takeHeaviestTipSet(ctx, ts); err != nil {
|
||||||
|
return xerrors.Errorf("failed to switch chains when setting checkpoint: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
err = cs.metadataDs.Put(ctx, checkpointKey, tskBytes)
|
err = cs.metadataDs.Put(ctx, checkpointKey, tskBytes)
|
||||||
|
Loading…
Reference in New Issue
Block a user