wire up re-migration logic for nv10

This commit is contained in:
Steven Allen 2021-01-26 14:57:38 -08:00
parent 8d3cc632ac
commit 8986a2002d

View File

@ -67,7 +67,7 @@ type PreUpgradeFunc func(
sm *StateManager, cache MigrationCache,
oldState cid.Cid,
height abi.ChainEpoch, ts *types.TipSet,
)
) error
// PreUpgrade describes a pre-migration step to prepare for a network state upgrade. Pre-migrations
// are optimizations, are not guaranteed to run, and may be canceled and/or run multiple times.
@ -76,13 +76,11 @@ type PreUpgrade struct {
// expected to run asynchronously and must abort promptly when canceled.
PreUpgrade PreUpgradeFunc
// After specifies that this pre-migration should be started _after_ the given epoch.
//
// In case of chain reverts, the pre-migration will not be canceled and the state will not
// be reverted.
After abi.ChainEpoch
// When specifies that this pre-migration should be started at most When epochs before the upgrade.
When abi.ChainEpoch
// NotAfter specifies that this pre-migration should not be started after the given epoch.
// NotAfter specifies that this pre-migration should not be started NotAfter epochs before
// the final upgrade epoch.
//
// This should be set such that the pre-migration is likely to complete at least 5 epochs
// before the next pre-migration and/or upgrade epoch hits.
@ -170,6 +168,15 @@ func DefaultUpgradeSchedule() UpgradeSchedule {
Height: build.UpgradeActorsV3Height,
Network: network.Version10,
Migration: UpgradeActorsV3,
PreUpgrades: []PreUpgrade{{
PreUpgrade: PreUpgradeActorsV3,
When: 120,
NotAfter: 60,
}, {
PreUpgrade: PreUpgradeActorsV3,
When: 30,
NotAfter: 20,
}},
Expensive: true,
}}
@ -234,48 +241,66 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
defer cancel()
type op struct {
epoch abi.ChainEpoch
run func(ts *types.TipSet)
after abi.ChainEpoch
notAfter abi.ChainEpoch
run func(ts *types.TipSet) error
}
// Turn each pre-migration into an operation in a schedule.
var schedule []op
for _, migration := range sm.stateMigrations {
for upgradeEpoch, migration := range sm.stateMigrations {
cache := migration.cache
for _, prem := range migration.preMigrations {
// TODO: make sure the schedule makes sense after < notafter, etc.
preCtx, preCancel := context.WithCancel(ctx)
migrationFunc := prem.PreUpgrade
afterEpoch := upgradeEpoch - prem.When
notAfterEpoch := upgradeEpoch - prem.NotAfter
// Add an op to start a pre-migration.
schedule = append(schedule, op{
epoch: prem.After,
after: afterEpoch,
notAfter: notAfterEpoch,
// TODO: are these values correct?
run: func(ts *types.TipSet) { migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts) },
run: func(ts *types.TipSet) error {
return migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts)
},
})
// Add an op to cancle the pre-migration if it's still running.
// Add an op to cancel the pre-migration if it's still running.
schedule = append(schedule, op{
epoch: prem.NotAfter,
run: func(ts *types.TipSet) { preCancel() },
after: notAfterEpoch,
notAfter: -1,
run: func(ts *types.TipSet) error {
preCancel()
return nil
},
})
}
}
// Then sort by epoch.
sort.Slice(schedule, func(i, j int) bool {
return schedule[i].epoch < schedule[j].epoch
return schedule[i].after < schedule[j].after
})
// Finally, when the head changes, see if there's anything we need to do.
for change := range sm.cs.SubHeadChanges(ctx) {
for _, head := range change {
for len(schedule) > 0 {
if head.Val.Height() < schedule[0].epoch {
op := &schedule[0]
if head.Val.Height() < op.after {
break
}
schedule[0].run(head.Val)
// If we haven't passed the pre-migration height...
if op.notAfter < 0 || head.Val.Height() <= op.notAfter {
err := op.run(head.Val)
if err != nil {
log.Errorw("failed to run pre-migration", "error", err)
}
}
schedule = schedule[1:]
}
}
@ -804,12 +829,45 @@ func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb E
return newRoot, nil
}
func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) {
// TODO: tune this.
config := nv10.Config{MaxWorkers: 1}
newRoot, err := upgradeActorsV3Common(ctx, sm, cache, root, epoch, ts, config)
if err != nil {
return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err)
}
// perform some basic sanity checks to make sure everything still works.
store := store.ActorStore(ctx, sm.ChainStore().Blockstore())
if newSm, err := state.LoadStateTree(store, newRoot); err != nil {
return cid.Undef, xerrors.Errorf("state tree sanity load failed: %w", err)
} else if newRoot2, err := newSm.Flush(ctx); err != nil {
return cid.Undef, xerrors.Errorf("state tree sanity flush failed: %w", err)
} else if newRoot2 != newRoot {
return cid.Undef, xerrors.Errorf("state-root mismatch: %s != %s", newRoot, newRoot2)
} else if _, err := newSm.GetActor(init_.Address); err != nil {
return cid.Undef, xerrors.Errorf("failed to load init actor after upgrade: %w", err)
}
return newRoot, nil
}
func PreUpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error {
// TODO: tune this.
config := nv10.Config{MaxWorkers: 1}
_, err := upgradeActorsV3Common(ctx, sm, cache, root, epoch, ts, config)
return err
}
func upgradeActorsV3Common(
ctx context.Context, sm *StateManager, cache MigrationCache,
root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet,
config nv10.Config,
) (cid.Cid, error) {
buf := bufbstore.NewTieredBstore(sm.cs.Blockstore(), bstore.NewTemporarySync())
store := store.ActorStore(ctx, buf)
// Load the state root.
var stateRoot types.StateRoot
if err := store.Get(ctx, root, &stateRoot); err != nil {
return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err)
@ -823,18 +881,12 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb
}
// Perform the migration
// TODO: store this somewhere and pre-migrate
cache := nv10.NewMemMigrationCache()
// TODO: tune this.
config := nv10.Config{MaxWorkers: 1}
newHamtRoot, err := nv10.MigrateStateTree(ctx, store, stateRoot.Actors, epoch, config, migrationLogger{}, cache)
if err != nil {
return cid.Undef, xerrors.Errorf("upgrading to actors v2: %w", err)
}
// Persist the result.
newRoot, err := store.Put(ctx, &types.StateRoot{
Version: types.StateTreeVersion2,
Actors: newHamtRoot,
@ -844,19 +896,6 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb
return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err)
}
// Check the result.
// perform some basic sanity checks to make sure everything still works.
if newSm, err := state.LoadStateTree(store, newRoot); err != nil {
return cid.Undef, xerrors.Errorf("state tree sanity load failed: %w", err)
} else if newRoot2, err := newSm.Flush(ctx); err != nil {
return cid.Undef, xerrors.Errorf("state tree sanity flush failed: %w", err)
} else if newRoot2 != newRoot {
return cid.Undef, xerrors.Errorf("state-root mismatch: %s != %s", newRoot, newRoot2)
} else if _, err := newSm.GetActor(init_.Address); err != nil {
return cid.Undef, xerrors.Errorf("failed to load init actor after upgrade: %w", err)
}
// Persist the new tree.
{