storagefsm: Trigger input processing wheen below limits
This commit is contained in:
parent
06aaa668d6
commit
f5ed25371b
31
extern/storage-sealing/fsm.go
vendored
31
extern/storage-sealing/fsm.go
vendored
@ -300,7 +300,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
|
|
||||||
*/
|
*/
|
||||||
|
|
||||||
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
|
if err := m.onUpdateSector(context.TODO(), state); err != nil {
|
||||||
|
log.Errorw("update sector stats", "error", err)
|
||||||
|
}
|
||||||
|
|
||||||
switch state.State {
|
switch state.State {
|
||||||
// Happy path
|
// Happy path
|
||||||
@ -391,6 +393,33 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
return nil, processed, nil
|
return nil, processed, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {
|
||||||
|
cfg, err := m.getConfig()
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting config: %w", err)
|
||||||
|
}
|
||||||
|
sp, err := m.currentSealProof(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return xerrors.Errorf("getting seal proof type: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State)
|
||||||
|
|
||||||
|
// trigger more input processing when we've dipped below max sealing limits
|
||||||
|
if shouldUpdateInput {
|
||||||
|
go func() {
|
||||||
|
m.inputLk.Unlock()
|
||||||
|
defer m.inputLk.Unlock()
|
||||||
|
|
||||||
|
if err := m.updateInput(ctx, sp); err != nil {
|
||||||
|
log.Errorf("%+v", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||||
for i, event := range events {
|
for i, event := range events {
|
||||||
switch e := event.User.(type) {
|
switch e := event.User.(type) {
|
||||||
|
25
extern/storage-sealing/stats.go
vendored
25
extern/storage-sealing/stats.go
vendored
@ -4,6 +4,7 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
|
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
type statSectorState int
|
type statSectorState int
|
||||||
@ -23,10 +24,14 @@ type SectorStats struct {
|
|||||||
totals [nsst]uint64
|
totals [nsst]uint64
|
||||||
}
|
}
|
||||||
|
|
||||||
func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
|
||||||
ss.lk.Lock()
|
ss.lk.Lock()
|
||||||
defer ss.lk.Unlock()
|
defer ss.lk.Unlock()
|
||||||
|
|
||||||
|
preSealing := ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||||
|
preStaging := ss.totals[sstStaging]
|
||||||
|
|
||||||
|
// update totals
|
||||||
oldst, found := ss.bySector[id]
|
oldst, found := ss.bySector[id]
|
||||||
if found {
|
if found {
|
||||||
ss.totals[oldst]--
|
ss.totals[oldst]--
|
||||||
@ -35,6 +40,24 @@ func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
|||||||
sst := toStatState(st)
|
sst := toStatState(st)
|
||||||
ss.bySector[id] = sst
|
ss.bySector[id] = sst
|
||||||
ss.totals[sst]++
|
ss.totals[sst]++
|
||||||
|
|
||||||
|
// check if we may need be able to process more deals
|
||||||
|
sealing := ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||||
|
staging := ss.totals[sstStaging]
|
||||||
|
|
||||||
|
if cfg.MaxSealingSectorsForDeals > 0 && // max sealing deal sector limit set
|
||||||
|
preSealing >= cfg.MaxSealingSectorsForDeals && // we were over limit
|
||||||
|
sealing < cfg.MaxSealingSectorsForDeals { // and we're below the limit now
|
||||||
|
updateInput = true
|
||||||
|
}
|
||||||
|
|
||||||
|
if cfg.MaxWaitDealsSectors > 0 && // max waiting deal sector limit set
|
||||||
|
preStaging >= cfg.MaxWaitDealsSectors && // we were over limit
|
||||||
|
staging < cfg.MaxWaitDealsSectors { // and we're below the limit now
|
||||||
|
updateInput = true
|
||||||
|
}
|
||||||
|
|
||||||
|
return updateInput
|
||||||
}
|
}
|
||||||
|
|
||||||
// return the number of sectors currently in the sealing pipeline
|
// return the number of sectors currently in the sealing pipeline
|
||||||
|
Loading…
Reference in New Issue
Block a user