Merge pull request #5801 from filecoin-project/fix/fsm-input-stuck
storagefsm: Trigger input processing when below limits
This commit is contained in:
commit
1aea1ebb9d
@ -23,9 +23,11 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/market"
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
"github.com/filecoin-project/lotus/markets/storageadapter"
|
||||
"github.com/filecoin-project/lotus/node"
|
||||
"github.com/filecoin-project/lotus/node/impl"
|
||||
"github.com/filecoin-project/lotus/node/modules/dtypes"
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
ipld "github.com/ipfs/go-ipld-format"
|
||||
dag "github.com/ipfs/go-merkledag"
|
||||
@ -183,6 +185,71 @@ func TestPublishDealsBatching(t *testing.T, b APIBuilder, blocktime time.Duratio
|
||||
}
|
||||
}
|
||||
|
||||
func TestBatchDealInput(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||
publishPeriod := 10 * time.Second
|
||||
maxDealsPerMsg := uint64(4)
|
||||
|
||||
// Set max deals per publish deals message to maxDealsPerMsg
|
||||
minerDef := []StorageMiner{{
|
||||
Full: 0,
|
||||
Opts: node.Options(
|
||||
node.Override(
|
||||
new(*storageadapter.DealPublisher),
|
||||
storageadapter.NewDealPublisher(nil, storageadapter.PublishMsgConfig{
|
||||
Period: publishPeriod,
|
||||
MaxDealsPerMsg: maxDealsPerMsg,
|
||||
})),
|
||||
node.Override(new(dtypes.GetSealingConfigFunc), func() (dtypes.GetSealingConfigFunc, error) {
|
||||
return func() (sealiface.Config, error) {
|
||||
return sealiface.Config{
|
||||
MaxWaitDealsSectors: 1,
|
||||
MaxSealingSectors: 1,
|
||||
MaxSealingSectorsForDeals: 2,
|
||||
AlwaysKeepUnsealedCopy: true,
|
||||
}, nil
|
||||
}, nil
|
||||
}),
|
||||
),
|
||||
Preseal: PresealGenesis,
|
||||
}}
|
||||
|
||||
// Create a connect client and miner node
|
||||
n, sn := b(t, OneFull, minerDef)
|
||||
client := n[0].FullNode.(*impl.FullNodeAPI)
|
||||
miner := sn[0]
|
||||
s := connectAndStartMining(t, b, blocktime, client, miner)
|
||||
defer s.blockMiner.Stop()
|
||||
|
||||
// Starts a deal and waits until it's published
|
||||
runDealTillSeal := func(rseed int) {
|
||||
res, _, err := CreateClientFile(s.ctx, s.client, rseed)
|
||||
require.NoError(t, err)
|
||||
|
||||
dc := startDeal(t, s.ctx, s.miner, s.client, res.Root, false, startEpoch)
|
||||
waitDealSealed(t, s.ctx, s.miner, s.client, dc, false)
|
||||
}
|
||||
|
||||
// Run maxDealsPerMsg+1 deals in parallel
|
||||
done := make(chan struct{}, maxDealsPerMsg+1)
|
||||
for rseed := 1; rseed <= int(maxDealsPerMsg+1); rseed++ {
|
||||
rseed := rseed
|
||||
go func() {
|
||||
runDealTillSeal(rseed)
|
||||
done <- struct{}{}
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for maxDealsPerMsg of the deals to be published
|
||||
for i := 0; i < int(maxDealsPerMsg); i++ {
|
||||
<-done
|
||||
}
|
||||
|
||||
sl, err := sn[0].SectorsList(s.ctx)
|
||||
require.NoError(t, err)
|
||||
require.GreaterOrEqual(t, len(sl), 4)
|
||||
require.LessOrEqual(t, len(sl), 5)
|
||||
}
|
||||
|
||||
func TestFastRetrievalDealFlow(t *testing.T, b APIBuilder, blocktime time.Duration, startEpoch abi.ChainEpoch) {
|
||||
s := setupOneClientOneMiner(t, b, blocktime)
|
||||
defer s.blockMiner.Stop()
|
||||
|
35
extern/storage-sealing/fsm.go
vendored
35
extern/storage-sealing/fsm.go
vendored
@ -300,7 +300,9 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
|
||||
*/
|
||||
|
||||
m.stats.updateSector(m.minerSectorID(state.SectorNumber), state.State)
|
||||
if err := m.onUpdateSector(context.TODO(), state); err != nil {
|
||||
log.Errorw("update sector stats", "error", err)
|
||||
}
|
||||
|
||||
switch state.State {
|
||||
// Happy path
|
||||
@ -391,6 +393,37 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
return nil, processed, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) onUpdateSector(ctx context.Context, state *SectorInfo) error {
|
||||
if m.getConfig == nil {
|
||||
return nil // tests
|
||||
}
|
||||
|
||||
cfg, err := m.getConfig()
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting config: %w", err)
|
||||
}
|
||||
sp, err := m.currentSealProof(ctx)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting seal proof type: %w", err)
|
||||
}
|
||||
|
||||
shouldUpdateInput := m.stats.updateSector(cfg, m.minerSectorID(state.SectorNumber), state.State)
|
||||
|
||||
// trigger more input processing when we've dipped below max sealing limits
|
||||
if shouldUpdateInput {
|
||||
go func() {
|
||||
m.inputLk.Lock()
|
||||
defer m.inputLk.Unlock()
|
||||
|
||||
if err := m.updateInput(ctx, sp); err != nil {
|
||||
log.Errorf("%+v", err)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func planCommitting(events []statemachine.Event, state *SectorInfo) (uint64, error) {
|
||||
for i, event := range events {
|
||||
switch e := event.User.(type) {
|
||||
|
11
extern/storage-sealing/garbage.go
vendored
11
extern/storage-sealing/garbage.go
vendored
@ -28,18 +28,13 @@ func (m *Sealing) PledgeSector(ctx context.Context) (storage.SectorRef, error) {
|
||||
return storage.SectorRef{}, xerrors.Errorf("getting seal proof type: %w", err)
|
||||
}
|
||||
|
||||
sid, err := m.sc.Next()
|
||||
sid, err := m.createSector(ctx, cfg, spt)
|
||||
if err != nil {
|
||||
return storage.SectorRef{}, xerrors.Errorf("generating sector number: %w", err)
|
||||
}
|
||||
sectorID := m.minerSector(spt, sid)
|
||||
err = m.sealer.NewSector(ctx, sectorID)
|
||||
if err != nil {
|
||||
return storage.SectorRef{}, xerrors.Errorf("notifying sealer of the new sector: %w", err)
|
||||
return storage.SectorRef{}, err
|
||||
}
|
||||
|
||||
log.Infof("Creating CC sector %d", sid)
|
||||
return sectorID, m.sectors.Send(uint64(sid), SectorStartCC{
|
||||
return m.minerSector(spt, sid), m.sectors.Send(uint64(sid), SectorStartCC{
|
||||
ID: sid,
|
||||
SectorType: spt,
|
||||
})
|
||||
|
32
extern/storage-sealing/input.go
vendored
32
extern/storage-sealing/input.go
vendored
@ -16,6 +16,7 @@ import (
|
||||
|
||||
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -388,16 +389,9 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
|
||||
return nil
|
||||
}
|
||||
|
||||
// Now actually create a new sector
|
||||
|
||||
sid, err := m.sc.Next()
|
||||
sid, err := m.createSector(ctx, cfg, sp)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting sector number: %w", err)
|
||||
}
|
||||
|
||||
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
|
||||
if err != nil {
|
||||
return xerrors.Errorf("initializing sector: %w", err)
|
||||
return err
|
||||
}
|
||||
|
||||
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
|
||||
@ -407,6 +401,26 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
|
||||
})
|
||||
}
|
||||
|
||||
// call with m.inputLk
|
||||
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
|
||||
// Now actually create a new sector
|
||||
|
||||
sid, err := m.sc.Next()
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("getting sector number: %w", err)
|
||||
}
|
||||
|
||||
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
|
||||
if err != nil {
|
||||
return 0, xerrors.Errorf("initializing sector: %w", err)
|
||||
}
|
||||
|
||||
// update stats early, fsm planner would do that async
|
||||
m.stats.updateSector(cfg, m.minerSectorID(sid), UndefinedSectorState)
|
||||
|
||||
return sid, nil
|
||||
}
|
||||
|
||||
func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
||||
}
|
||||
|
39
extern/storage-sealing/stats.go
vendored
39
extern/storage-sealing/stats.go
vendored
@ -4,6 +4,7 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
|
||||
)
|
||||
|
||||
type statSectorState int
|
||||
@ -23,10 +24,14 @@ type SectorStats struct {
|
||||
totals [nsst]uint64
|
||||
}
|
||||
|
||||
func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
||||
func (ss *SectorStats) updateSector(cfg sealiface.Config, id abi.SectorID, st SectorState) (updateInput bool) {
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
preSealing := ss.curSealingLocked()
|
||||
preStaging := ss.curStagingLocked()
|
||||
|
||||
// update totals
|
||||
oldst, found := ss.bySector[id]
|
||||
if found {
|
||||
ss.totals[oldst]--
|
||||
@ -35,6 +40,34 @@ func (ss *SectorStats) updateSector(id abi.SectorID, st SectorState) {
|
||||
sst := toStatState(st)
|
||||
ss.bySector[id] = sst
|
||||
ss.totals[sst]++
|
||||
|
||||
// check if we may need be able to process more deals
|
||||
sealing := ss.curSealingLocked()
|
||||
staging := ss.curStagingLocked()
|
||||
|
||||
log.Debugw("sector stats", "sealing", sealing, "staging", staging)
|
||||
|
||||
if cfg.MaxSealingSectorsForDeals > 0 && // max sealing deal sector limit set
|
||||
preSealing >= cfg.MaxSealingSectorsForDeals && // we were over limit
|
||||
sealing < cfg.MaxSealingSectorsForDeals { // and we're below the limit now
|
||||
updateInput = true
|
||||
}
|
||||
|
||||
if cfg.MaxWaitDealsSectors > 0 && // max waiting deal sector limit set
|
||||
preStaging >= cfg.MaxWaitDealsSectors && // we were over limit
|
||||
staging < cfg.MaxWaitDealsSectors { // and we're below the limit now
|
||||
updateInput = true
|
||||
}
|
||||
|
||||
return updateInput
|
||||
}
|
||||
|
||||
func (ss *SectorStats) curSealingLocked() uint64 {
|
||||
return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||
}
|
||||
|
||||
func (ss *SectorStats) curStagingLocked() uint64 {
|
||||
return ss.totals[sstStaging]
|
||||
}
|
||||
|
||||
// return the number of sectors currently in the sealing pipeline
|
||||
@ -42,7 +75,7 @@ func (ss *SectorStats) curSealing() uint64 {
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
return ss.totals[sstStaging] + ss.totals[sstSealing] + ss.totals[sstFailed]
|
||||
return ss.curSealingLocked()
|
||||
}
|
||||
|
||||
// return the number of sectors waiting to enter the sealing pipeline
|
||||
@ -50,5 +83,5 @@ func (ss *SectorStats) curStaging() uint64 {
|
||||
ss.lk.Lock()
|
||||
defer ss.lk.Unlock()
|
||||
|
||||
return ss.totals[sstStaging]
|
||||
return ss.curStagingLocked()
|
||||
}
|
||||
|
@ -58,6 +58,9 @@ func TestAPIDealFlow(t *testing.T) {
|
||||
t.Run("TestPublishDealsBatching", func(t *testing.T) {
|
||||
test.TestPublishDealsBatching(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||
})
|
||||
t.Run("TestBatchDealInput", func(t *testing.T) {
|
||||
test.TestBatchDealInput(t, builder.MockSbBuilder, blockTime, dealStartEpoch)
|
||||
})
|
||||
}
|
||||
|
||||
func TestAPIDealFlowReal(t *testing.T) {
|
||||
|
Loading…
Reference in New Issue
Block a user