wait for pre-upgrades to exit on stop

This commit is contained in:
Steven Allen 2021-01-26 15:11:31 -08:00
parent f65d179f2c
commit 77117a0be5
2 changed files with 21 additions and 6 deletions
chain/stmgr

View File

@ -5,6 +5,7 @@ import (
"context" "context"
"encoding/binary" "encoding/binary"
"sort" "sort"
"sync"
"github.com/filecoin-project/go-state-types/rt" "github.com/filecoin-project/go-state-types/rt"
@ -243,6 +244,8 @@ func (sm *StateManager) hasExpensiveFork(ctx context.Context, height abi.ChainEp
} }
func (sm *StateManager) preMigrationWorker(ctx context.Context) { func (sm *StateManager) preMigrationWorker(ctx context.Context) {
defer close(sm.shutdown)
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
@ -252,6 +255,9 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
run func(ts *types.TipSet) run func(ts *types.TipSet)
} }
var wg sync.WaitGroup
defer wg.Wait()
// Turn each pre-migration into an operation in a schedule. // Turn each pre-migration into an operation in a schedule.
var schedule []op var schedule []op
for upgradeEpoch, migration := range sm.stateMigrations { for upgradeEpoch, migration := range sm.stateMigrations {
@ -270,7 +276,9 @@ func (sm *StateManager) preMigrationWorker(ctx context.Context) {
// TODO: are these values correct? // TODO: are these values correct?
run: func(ts *types.TipSet) { run: func(ts *types.TipSet) {
wg.Add(1)
go func() { go func() {
defer wg.Done()
err := migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts) err := migrationFunc(preCtx, sm, cache, ts.ParentState(), ts.Height(), ts)
if err != nil { if err != nil {
log.Errorw("failed to run pre-migration", log.Errorw("failed to run pre-migration",

View File

@ -72,8 +72,8 @@ type migration struct {
type StateManager struct { type StateManager struct {
cs *store.ChainStore cs *store.ChainStore
ctx context.Context cancel context.CancelFunc
cancel context.CancelFunc shutdown chan struct{}
// Determines the network version at any given epoch. // Determines the network version at any given epoch.
networkVersions []versionSpec networkVersions []versionSpec
@ -168,17 +168,24 @@ func cidsToKey(cids []cid.Cid) string {
// //
// This is method is not safe to invoke from multiple threads or concurrently with Stop. // This is method is not safe to invoke from multiple threads or concurrently with Stop.
func (sm *StateManager) Start(context.Context) error { func (sm *StateManager) Start(context.Context) error {
sm.ctx, sm.cancel = context.WithCancel(context.Background()) var ctx context.Context
go sm.preMigrationWorker(sm.ctx) ctx, sm.cancel = context.WithCancel(context.Background())
sm.shutdown = make(chan struct{})
go sm.preMigrationWorker(ctx)
return nil return nil
} }
// Stop starts the state manager's background processes. // Stop starts the state manager's background processes.
// //
// This is method is not safe to invoke from multiple threads or concurrently with Start. // This is method is not safe to invoke concurrently with Start.
func (sm *StateManager) Stop(context.Context) error { func (sm *StateManager) Stop(ctx context.Context) error {
if sm.cancel != nil { if sm.cancel != nil {
sm.cancel() sm.cancel()
select {
case <-sm.shutdown:
case <-ctx.Done():
return ctx.Err()
}
} }
return nil return nil
} }