From 8986a2002d686ff0ff5bb69c9f52e159f6d0eac7 Mon Sep 17 00:00:00 2001 From: Steven Allen Date: Tue, 26 Jan 2021 14:57:38 -0800 Subject: [PATCH] wire up re-migration logic for nv10 --- chain/stmgr/forks.go | 119 ++++++++++++++++++++++++++++--------------- 1 file changed, 79 insertions(+), 40 deletions(-) diff --git a/chain/stmgr/forks.go b/chain/stmgr/forks.go index 8db1b9de4..90c643862 100644 --- a/chain/stmgr/forks.go +++ b/chain/stmgr/forks.go @@ -67,7 +67,7 @@ type PreUpgradeFunc func( sm *StateManager, cache MigrationCache, oldState cid.Cid, height abi.ChainEpoch, ts *types.TipSet, -) +) error // PreUpgrade describes a pre-migration step to prepare for a network state upgrade. Pre-migrations // are optimizations, are not guaranteed to run, and may be canceled and/or run multiple times. @@ -76,13 +76,11 @@ type PreUpgrade struct { // expected to run asynchronously and must abort promptly when canceled. PreUpgrade PreUpgradeFunc - // After specifies that this pre-migration should be started _after_ the given epoch. - // - // In case of chain reverts, the pre-migration will not be canceled and the state will not - // be reverted. - After abi.ChainEpoch + // When specifies that this pre-migration should be started at most When epochs before the upgrade. + When abi.ChainEpoch - // NotAfter specifies that this pre-migration should not be started after the given epoch. + // NotAfter specifies that this pre-migration should not be started NotAfter epochs before + // the final upgrade epoch. // // This should be set such that the pre-migration is likely to complete at least 5 epochs // before the next pre-migration and/or upgrade epoch hits. @@ -170,6 +168,15 @@ func DefaultUpgradeSchedule() UpgradeSchedule { Height: build.UpgradeActorsV3Height, Network: network.Version10, Migration: UpgradeActorsV3, + PreUpgrades: []PreUpgrade{{ + PreUpgrade: PreUpgradeActorsV3, + When: 120, + NotAfter: 60, + }, { + PreUpgrade: PreUpgradeActorsV3, + When: 30, + NotAfter: 20, + }}, Expensive: true, }} @@ -234,48 +241,66 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) { defer cancel() type op struct { - epoch abi.ChainEpoch - run func(ts *types.TipSet) + after abi.ChainEpoch + notAfter abi.ChainEpoch + run func(ts *types.TipSet) error } // Turn each pre-migration into an operation in a schedule. var schedule []op - for _, migration := range sm.stateMigrations { + for upgradeEpoch, migration := range sm.stateMigrations { cache := migration.cache for _, prem := range migration.preMigrations { - // TODO: make sure the schedule makes sense after < notafter, etc. preCtx, preCancel := context.WithCancel(ctx) migrationFunc := prem.PreUpgrade + afterEpoch := upgradeEpoch - prem.When + notAfterEpoch := upgradeEpoch - prem.NotAfter + // Add an op to start a pre-migration. schedule = append(schedule, op{ - epoch: prem.After, + after: afterEpoch, + notAfter: notAfterEpoch, // TODO: are these values correct? - run: func(ts *types.TipSet) { migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts) }, + run: func(ts *types.TipSet) error { + return migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts) + }, }) - // Add an op to cancle the pre-migration if it's still running. + // Add an op to cancel the pre-migration if it's still running. schedule = append(schedule, op{ - epoch: prem.NotAfter, - run: func(ts *types.TipSet) { preCancel() }, + after: notAfterEpoch, + notAfter: -1, + run: func(ts *types.TipSet) error { + preCancel() + return nil + }, }) } } // Then sort by epoch. sort.Slice(schedule, func(i, j int) bool { - return schedule[i].epoch < schedule[j].epoch + return schedule[i].after < schedule[j].after }) // Finally, when the head changes, see if there's anything we need to do. for change := range sm.cs.SubHeadChanges(ctx) { for _, head := range change { for len(schedule) > 0 { - if head.Val.Height() < schedule[0].epoch { + op := &schedule[0] + if head.Val.Height() < op.after { break } - schedule[0].run(head.Val) + + // If we haven't passed the pre-migration height... + if op.notAfter < 0 || head.Val.Height() <= op.notAfter { + err := op.run(head.Val) + if err != nil { + log.Errorw("failed to run pre-migration", "error", err) + } + } schedule = schedule[1:] } } @@ -804,12 +829,45 @@ func UpgradeCalico(ctx context.Context, sm *StateManager, _ MigrationCache, cb E return newRoot, nil } -func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { +func UpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, cb ExecCallback, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) (cid.Cid, error) { + // TODO: tune this. + config := nv10.Config{MaxWorkers: 1} + newRoot, err := upgradeActorsV3Common(ctx, sm, cache, root, epoch, ts, config) + if err != nil { + return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err) + } + + // perform some basic sanity checks to make sure everything still works. + store := store.ActorStore(ctx, sm.ChainStore().Blockstore()) + if newSm, err := state.LoadStateTree(store, newRoot); err != nil { + return cid.Undef, xerrors.Errorf("state tree sanity load failed: %w", err) + } else if newRoot2, err := newSm.Flush(ctx); err != nil { + return cid.Undef, xerrors.Errorf("state tree sanity flush failed: %w", err) + } else if newRoot2 != newRoot { + return cid.Undef, xerrors.Errorf("state-root mismatch: %s != %s", newRoot, newRoot2) + } else if _, err := newSm.GetActor(init_.Address); err != nil { + return cid.Undef, xerrors.Errorf("failed to load init actor after upgrade: %w", err) + } + + return newRoot, nil +} + +func PreUpgradeActorsV3(ctx context.Context, sm *StateManager, cache MigrationCache, root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet) error { + // TODO: tune this. + config := nv10.Config{MaxWorkers: 1} + _, err := upgradeActorsV3Common(ctx, sm, cache, root, epoch, ts, config) + return err +} + +func upgradeActorsV3Common( + ctx context.Context, sm *StateManager, cache MigrationCache, + root cid.Cid, epoch abi.ChainEpoch, ts *types.TipSet, + config nv10.Config, +) (cid.Cid, error) { buf := bufbstore.NewTieredBstore(sm.cs.Blockstore(), bstore.NewTemporarySync()) store := store.ActorStore(ctx, buf) // Load the state root. - var stateRoot types.StateRoot if err := store.Get(ctx, root, &stateRoot); err != nil { return cid.Undef, xerrors.Errorf("failed to decode state root: %w", err) @@ -823,18 +881,12 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb } // Perform the migration - - // TODO: store this somewhere and pre-migrate - cache := nv10.NewMemMigrationCache() - // TODO: tune this. - config := nv10.Config{MaxWorkers: 1} newHamtRoot, err := nv10.MigrateStateTree(ctx, store, stateRoot.Actors, epoch, config, migrationLogger{}, cache) if err != nil { return cid.Undef, xerrors.Errorf("upgrading to actors v2: %w", err) } // Persist the result. - newRoot, err := store.Put(ctx, &types.StateRoot{ Version: types.StateTreeVersion2, Actors: newHamtRoot, @@ -844,19 +896,6 @@ func UpgradeActorsV3(ctx context.Context, sm *StateManager, _ MigrationCache, cb return cid.Undef, xerrors.Errorf("failed to persist new state root: %w", err) } - // Check the result. - - // perform some basic sanity checks to make sure everything still works. - if newSm, err := state.LoadStateTree(store, newRoot); err != nil { - return cid.Undef, xerrors.Errorf("state tree sanity load failed: %w", err) - } else if newRoot2, err := newSm.Flush(ctx); err != nil { - return cid.Undef, xerrors.Errorf("state tree sanity flush failed: %w", err) - } else if newRoot2 != newRoot { - return cid.Undef, xerrors.Errorf("state-root mismatch: %s != %s", newRoot, newRoot2) - } else if _, err := newSm.GetActor(init_.Address); err != nil { - return cid.Undef, xerrors.Errorf("failed to load init actor after upgrade: %w", err) - } - // Persist the new tree. {