From 74db586fdfaecdac1ed4017fcbb25e7b362786a3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 15 Jun 2021 21:04:11 +0200 Subject: [PATCH] sealing: Fix restartSectors race --- extern/storage-sealing/fsm.go | 3 +++ extern/storage-sealing/garbage.go | 2 ++ extern/storage-sealing/input.go | 3 +++ extern/storage-sealing/sealing.go | 7 +++++++ 4 files changed, 15 insertions(+) diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index e899701cc..359c49eb3 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -522,6 +522,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err } func (m *Sealing) restartSectors(ctx context.Context) error { + defer m.startupWait.Done() + trackedSectors, err := m.ListSectors() if err != nil { log.Errorf("loading sector list: %+v", err) @@ -539,6 +541,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error { } func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error { + m.startupWait.Wait() return m.sectors.Send(id, SectorForceState{state}) } diff --git a/extern/storage-sealing/garbage.go b/extern/storage-sealing/garbage.go index c8ec21a84..d429b5b43 100644 --- a/extern/storage-sealing/garbage.go +++ b/extern/storage-sealing/garbage.go @@ -9,6 +9,8 @@ import ( ) func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) { + m.startupWait.Wait() + m.inputLk.Lock() defer m.inputLk.Unlock() diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index bf66382d3..4a698ea1d 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -394,6 +394,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { + m.startupWait.Wait() if m.creating != nil { return nil // new sector is being created right now } @@ -446,7 +447,9 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi } func (m *Sealing) StartPacking(sid abi.SectorNumber) error { + m.startupWait.Wait() log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user") + return m.sectors.Send(uint64(sid), SectorStartPacking{}) } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 2d53223db..2019aa131 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -83,6 +83,8 @@ type Sealing struct { feeCfg config.MinerFeeConfig events Events + startupWait sync.WaitGroup + maddr address.Address sealer sectorstorage.SectorManager @@ -162,6 +164,7 @@ func New(api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address. bySector: map[abi.SectorID]statSectorState{}, }, } + s.startupWait.Add(1) s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) @@ -189,10 +192,14 @@ func (m *Sealing) Stop(ctx context.Context) error { } func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { + m.startupWait.Wait() + return m.sectors.Send(uint64(sid), SectorRemove{}) } func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { + m.startupWait.Wait() + return m.sectors.Send(uint64(sid), SectorTerminate{}) }