make pre-migrations async

This commit is contained in:
Steven Allen 2021-01-26 15:05:12 -08:00
parent 8d05c5d62c
commit f65d179f2c

View File

@ -73,7 +73,7 @@ type PreUpgradeFunc func(
// are optimizations, are not guaranteed to run, and may be canceled and/or run multiple times. // are optimizations, are not guaranteed to run, and may be canceled and/or run multiple times.
type PreUpgrade struct { type PreUpgrade struct {
// PreUpgrade is the pre-migration function to run at the specified time. This function is // PreUpgrade is the pre-migration function to run at the specified time. This function is
// expected to run asynchronously and must abort promptly when canceled. // run asynchronously and must abort promptly when canceled.
PreUpgrade PreUpgradeFunc PreUpgrade PreUpgradeFunc
// When specifies that this pre-migration should be started at most When epochs before the upgrade. // When specifies that this pre-migration should be started at most When epochs before the upgrade.
@ -249,7 +249,7 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
type op struct { type op struct {
after abi.ChainEpoch after abi.ChainEpoch
notAfter abi.ChainEpoch notAfter abi.ChainEpoch
run func(ts *types.TipSet) error run func(ts *types.TipSet)
} }
// Turn each pre-migration into an operation in a schedule. // Turn each pre-migration into an operation in a schedule.
@ -269,8 +269,14 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
notAfter: notAfterEpoch, notAfter: notAfterEpoch,
// TODO: are these values correct? // TODO: are these values correct?
run: func(ts *types.TipSet) error { run: func(ts *types.TipSet) {
return migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts) go func() {
err := migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts)
if err != nil {
log.Errorw("failed to run pre-migration",
"error", err)
}
}()
}, },
}) })
@ -278,10 +284,7 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
schedule = append(schedule, op{ schedule = append(schedule, op{
after: notAfterEpoch, after: notAfterEpoch,
notAfter: -1, notAfter: -1,
run: func(ts *types.TipSet) error { run: func(ts *types.TipSet) { preCancel() },
preCancel()
return nil
},
}) })
} }
} }
@ -302,10 +305,7 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
// If we haven't passed the pre-migration height... // If we haven't passed the pre-migration height...
if op.notAfter < 0 || head.Val.Height() <= op.notAfter { if op.notAfter < 0 || head.Val.Height() <= op.notAfter {
err := op.run(head.Val) op.run(head.Val)
if err != nil {
log.Errorw("failed to run pre-migration", "error", err)
}
} }
schedule = schedule[1:] schedule = schedule[1:]
} }