sealing: Fix restartSectors race

This commit is contained in:
Łukasz Magiera 2021-06-15 21:04:11 +02:00
parent fdd3a85033
commit 21ba7408dd
4 changed files with 16 additions and 0 deletions

View File

@ -514,6 +514,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
} }
func (m *Sealing) restartSectors(ctx context.Context) error { func (m *Sealing) restartSectors(ctx context.Context) error {
defer m.startupWait.Done()
trackedSectors, err := m.ListSectors() trackedSectors, err := m.ListSectors()
if err != nil { if err != nil {
log.Errorf("loading sector list: %+v", err) log.Errorf("loading sector list: %+v", err)
@ -531,6 +533,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
} }
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error { func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
m.startupWait.Wait()
return m.sectors.Send(id, SectorForceState{state}) return m.sectors.Send(id, SectorForceState{state})
} }

View File

@ -9,6 +9,8 @@ import (
) )
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) { func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
m.startupWait.Wait()
m.inputLk.Lock() m.inputLk.Lock()
defer m.inputLk.Unlock() defer m.inputLk.Unlock()

View File

@ -376,6 +376,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
} }
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
m.startupWait.Wait()
cfg, err := m.getConfig() cfg, err := m.getConfig()
if err != nil { if err != nil {
return xerrors.Errorf("getting storage config: %w", err) return xerrors.Errorf("getting storage config: %w", err)
@ -422,6 +424,8 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
} }
func (m *Sealing) StartPacking(sid abi.SectorNumber) error { func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorStartPacking{}) return m.sectors.Send(uint64(sid), SectorStartPacking{})
} }

View File

@ -83,6 +83,8 @@ type Sealing struct {
feeCfg config.MinerFeeConfig feeCfg config.MinerFeeConfig
events Events events Events
startupWait sync.WaitGroup
maddr address.Address maddr address.Address
sealer sectorstorage.SectorManager sealer sectorstorage.SectorManager
@ -161,6 +163,7 @@ func New(api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.
bySector: map[abi.SectorID]statSectorState{}, bySector: map[abi.SectorID]statSectorState{},
}, },
} }
s.startupWait.Add(1)
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
@ -188,10 +191,14 @@ func (m *Sealing) Stop(ctx context.Context) error {
} }
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorRemove{}) return m.sectors.Send(uint64(sid), SectorRemove{})
} }
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
m.startupWait.Wait()
return m.sectors.Send(uint64(sid), SectorTerminate{}) return m.sectors.Send(uint64(sid), SectorTerminate{})
} }