diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index ddfebad24..4842e6023 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -225,6 +225,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta */ + m.stats.updateSector(m.minerSector(state.SectorNumber), state.State) + switch state.State { // Happy path case Empty: diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 7ae2e5163..3ae75e189 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -81,6 +81,8 @@ type Sealing struct { upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} + stats SectorStats + getConfig GetSealingConfigFunc } @@ -268,6 +270,12 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { return 0, xerrors.Errorf("getting config: %w", err) } + if cfg.MaxSealingSectors > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectors { + return 0, xerrors.Errorf("too many sectors sealing") + } + } + if cfg.MaxWaitDealsSectors > 0 { // run in a loop because we have to drop the map lock here for a bit tries := 0 @@ -353,6 +361,17 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) { // newSectorCC accepts a slice of pieces with no deal (junk data) func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error { + cfg, err := m.getConfig() + if err != nil { + return xerrors.Errorf("getting config: %w", err) + } + + if cfg.MaxSealingSectors > 0 { + if m.stats.curSealing() > cfg.MaxSealingSectors { + return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors) + } + } + rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize()) if err != nil { return xerrors.Errorf("bad sector size: %w", err) diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index f2801c9fc..2f57d83e8 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -36,3 +36,14 @@ const ( RemoveFailed SectorState = "RemoveFailed" Removed SectorState = "Removed" ) + +func toStatState(st SectorState) statSectorState { + switch st { + case Empty, WaitDeals, Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitWait, FinalizeSector: + return sstSealing + case Proving, Removed, Removing: + return sstProving + } + + return sstFailed +} diff --git a/extern/storage-sealing/stats.go b/extern/storage-sealing/stats.go new file mode 100644 index 000000000..3d31a39ff --- /dev/null +++ b/extern/storage-sealing/stats.go @@ -0,0 +1,45 @@ +package sealing + +import ( + "sync" + + "github.com/filecoin-project/specs-actors/actors/abi" +) + +type statSectorState int +const ( + sstSealing statSectorState = iota + sstFailed + sstProving + nsst +) + +type SectorStats struct { + lk sync.Mutex + + bySector map[abi.SectorID]statSectorState + totals [nsst]uint64 +} + +func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) { + ss.lk.Lock() + defer ss.lk.Unlock() + + oldst, found := ss.bySector[id] + if found { + ss.totals[oldst]-- + } + + sst := toStatState(st) + ss.bySector[id] = sst + ss.totals[sst]++ +} + + +// return the number of sectors currently in the sealing pipeline +func (ss *SectorStats) curSealing() uint64 { + ss.lk.Lock() + defer ss.lk.Unlock() + + return ss.totals[sstSealing] + ss.totals[sstFailed] +}