fsm: Config for limiting max sealing sectors
This commit is contained in:
parent
65ffde9e4b
commit
6a49bd6d8e
2
extern/storage-sealing/fsm.go
vendored
2
extern/storage-sealing/fsm.go
vendored
@ -225,6 +225,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
|
m.stats.updateSector(m.minerSector(state.SectorNumber), state.State)
|
||||||
|
|
||||||
switch state.State {
|
switch state.State {
|
||||||
// Happy path
|
// Happy path
|
||||||
case Empty:
|
case Empty:
|
||||||
|
19
extern/storage-sealing/sealing.go
vendored
19
extern/storage-sealing/sealing.go
vendored
@ -81,6 +81,8 @@ type Sealing struct {
|
|||||||
upgradeLk sync.Mutex
|
upgradeLk sync.Mutex
|
||||||
toUpgrade map[abi.SectorNumber]struct{}
|
toUpgrade map[abi.SectorNumber]struct{}
|
||||||
|
|
||||||
|
stats SectorStats
|
||||||
|
|
||||||
getConfig GetSealingConfigFunc
|
getConfig GetSealingConfigFunc
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -268,6 +270,12 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
|
|||||||
return 0, xerrors.Errorf("getting config: %w", err)
|
return 0, xerrors.Errorf("getting config: %w", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectors > 0 {
|
||||||
|
if m.stats.curSealing() > cfg.MaxSealingSectors {
|
||||||
|
return 0, xerrors.Errorf("too many sectors sealing")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if cfg.MaxWaitDealsSectors > 0 {
|
if cfg.MaxWaitDealsSectors > 0 {
|
||||||
// run in a loop because we have to drop the map lock here for a bit
|
// run in a loop because we have to drop the map lock here for a bit
|
||||||
tries := 0
|
tries := 0
|
||||||
@ -353,6 +361,17 @@ func (m *Sealing) newDealSector() (abi.SectorNumber, error) {
|
|||||||
|
|
||||||
// newSectorCC accepts a slice of pieces with no deal (junk data)
|
// newSectorCC accepts a slice of pieces with no deal (junk data)
|
||||||
func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error {
|
func (m *Sealing) newSectorCC(sid abi.SectorNumber, pieces []Piece) error {
|
||||||
|
cfg, err := m.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting config: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectors > 0 {
|
||||||
|
if m.stats.curSealing() > cfg.MaxSealingSectors {
|
||||||
|
return xerrors.Errorf("too many sectors sealing (curSealing: %d, max: %d)", m.stats.curSealing(), cfg.MaxSealingSectors)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
|
rt, err := ffiwrapper.SealProofTypeFromSectorSize(m.sealer.SectorSize())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("bad sector size: %w", err)
|
return xerrors.Errorf("bad sector size: %w", err)
|
||||||
|
11
extern/storage-sealing/sector_state.go
vendored
11
extern/storage-sealing/sector_state.go
vendored
@ -36,3 +36,14 @@ const (
|
|||||||
RemoveFailed SectorState = "RemoveFailed"
|
RemoveFailed SectorState = "RemoveFailed"
|
||||||
Removed SectorState = "Removed"
|
Removed SectorState = "Removed"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func toStatState(st SectorState) statSectorState {
|
||||||
|
switch st {
|
||||||
|
case Empty, WaitDeals, Packing, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, CommitWait, FinalizeSector:
|
||||||
|
return sstSealing
|
||||||
|
case Proving, Removed, Removing:
|
||||||
|
return sstProving
|
||||||
|
}
|
||||||
|
|
||||||
|
return sstFailed
|
||||||
|
}
|
||||||
|
45
extern/storage-sealing/stats.go
vendored
Normal file
45
extern/storage-sealing/stats.go
vendored
Normal file
@ -0,0 +1,45 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"sync"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/abi"
|
||||||
|
)
|
||||||
|
|
||||||
|
type statSectorState int
|
||||||
|
const (
|
||||||
|
sstSealing statSectorState = iota
|
||||||
|
sstFailed
|
||||||
|
sstProving
|
||||||
|
nsst
|
||||||
|
)
|
||||||
|
|
||||||
|
type SectorStats struct {
|
||||||
|
lk sync.Mutex
|
||||||
|
|
||||||
|
bySector map[abi.SectorID]statSectorState
|
||||||
|
totals [nsst]uint64
|
||||||
|
}
|
||||||
|
|
||||||
|
func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
oldst, found := ss.bySector[id]
|
||||||
|
if found {
|
||||||
|
ss.totals[oldst]--
|
||||||
|
}
|
||||||
|
|
||||||
|
sst := toStatState(st)
|
||||||
|
ss.bySector[id] = sst
|
||||||
|
ss.totals[sst]++
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
// return the number of sectors currently in the sealing pipeline
|
||||||
|
func (ss *SectorStats) curSealing() uint64 {
|
||||||
|
ss.lk.Lock()
|
||||||
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
return ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user