Merge pull request #6495 from filecoin-project/asr/cherry-pick-rs
sealing: Fix restartSectors race
This commit is contained in:
commit
0319ccbc4d
3
extern/storage-sealing/fsm.go
vendored
3
extern/storage-sealing/fsm.go
vendored
@ -522,6 +522,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, err
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) restartSectors(ctx context.Context) error {
|
func (m *Sealing) restartSectors(ctx context.Context) error {
|
||||||
|
defer m.startupWait.Done()
|
||||||
|
|
||||||
trackedSectors, err := m.ListSectors()
|
trackedSectors, err := m.ListSectors()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Errorf("loading sector list: %+v", err)
|
log.Errorf("loading sector list: %+v", err)
|
||||||
@ -539,6 +541,7 @@ func (m *Sealing) restartSectors(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
|
func (m *Sealing) ForceSectorState(ctx context.Context, id abi.SectorNumber, state SectorState) error {
|
||||||
|
m.startupWait.Wait()
|
||||||
return m.sectors.Send(id, SectorForceState{state})
|
return m.sectors.Send(id, SectorForceState{state})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
2
extern/storage-sealing/garbage.go
vendored
2
extern/storage-sealing/garbage.go
vendored
@ -9,6 +9,8 @@ import (
|
|||||||
)
|
)
|
||||||
|
|
||||||
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
||||||
|
m.startupWait.Wait()
|
||||||
|
|
||||||
m.inputLk.Lock()
|
m.inputLk.Lock()
|
||||||
defer m.inputLk.Unlock()
|
defer m.inputLk.Unlock()
|
||||||
|
|
||||||
|
3
extern/storage-sealing/input.go
vendored
3
extern/storage-sealing/input.go
vendored
@ -394,6 +394,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
|
||||||
|
m.startupWait.Wait()
|
||||||
if m.creating != nil {
|
if m.creating != nil {
|
||||||
return nil // new sector is being created right now
|
return nil // new sector is being created right now
|
||||||
}
|
}
|
||||||
@ -446,7 +447,9 @@ func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||||
|
m.startupWait.Wait()
|
||||||
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
||||||
|
|
||||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
7
extern/storage-sealing/sealing.go
vendored
7
extern/storage-sealing/sealing.go
vendored
@ -83,6 +83,8 @@ type Sealing struct {
|
|||||||
feeCfg config.MinerFeeConfig
|
feeCfg config.MinerFeeConfig
|
||||||
events Events
|
events Events
|
||||||
|
|
||||||
|
startupWait sync.WaitGroup
|
||||||
|
|
||||||
maddr address.Address
|
maddr address.Address
|
||||||
|
|
||||||
sealer sectorstorage.SectorManager
|
sealer sectorstorage.SectorManager
|
||||||
@ -162,6 +164,7 @@ func New(api SealingAPI, fc config.MinerFeeConfig, events Events, maddr address.
|
|||||||
bySector: map[abi.SectorID]statSectorState{},
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
s.startupWait.Add(1)
|
||||||
|
|
||||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
|
||||||
@ -189,10 +192,14 @@ func (m *Sealing) Stop(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||||
|
m.startupWait.Wait()
|
||||||
|
|
||||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
|
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
|
||||||
|
m.startupWait.Wait()
|
||||||
|
|
||||||
return m.sectors.Send(uint64(sid), SectorTerminate{})
|
return m.sectors.Send(uint64(sid), SectorTerminate{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user