feat: sealing: Sector upgrade queue

This commit is contained in:
Łukasz Magiera 2022-03-16 17:33:05 +01:00
parent 5ea1502b02
commit 2cef55a4f6
9 changed files with 158 additions and 35 deletions

View File

@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{
{col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgBlue, state: sealing.Empty},

View File

@ -283,6 +283,9 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne(
on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
),
Terminating: planOne(
@ -558,6 +561,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal
case Proving:
return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating:
return m.handleTerminating, processed, nil
case TerminateWait:

View File

@ -297,6 +297,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
// Snap deals // CC update path
type SectorMarkForUpdate struct{}
func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}
type SectorStartCCUpdate struct{}
func (evt SectorStartCCUpdate) apply(state *SectorInfo) {

View File

@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp)
}
type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)
// called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}
ssize, err := sp.SectorSize()
if err != nil {
return err
@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
toAssign[proposalCid] = struct{}{}
memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}
for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue
}
if !ok {
exp, _ := expF(sector.number)
exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue
}
@ -497,7 +508,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil {
if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err)
}
}
@ -505,7 +516,91 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
return nil
}
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error {
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (target abi.ChainEpoch) {
var candidates []*pendingPiece
for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}
// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[i].deal.DealProposal.EndEpoch
})
var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)
if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidate.deal.DealProposal.EndEpoch
}
}
_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
log.Errorf("getting current epoch: %s", err)
return 0
}
_, maxDur := policy.DealDurationBounds(0)
return curEpoch + maxDur
}
func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
ssize, _ := sp.SectorSize() // error already checked in the caller
targetExpiration := m.calcTargetExpiration(ctx, ssize)
var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt
for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}
// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target
if bestExpiration < targetExpiration {
if expiration > bestExpiration {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}
if expiration > targetExpiration && pledge.LessThan(bestPledge) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}
if bestExpiration == 0 {
// didn't find a good sector
return false, nil
}
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.creating = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait()
if m.creating != nil {
@ -517,10 +612,6 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err)
}
if !cfg.MakeNewSectorForDeals {
return nil
}
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil
}
@ -529,6 +620,18 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return nil
}
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
if !cfg.MakeNewSectorForDeals {
return nil
}
sid, err := m.createSector(ctx, cfg, sp)
if err != nil {
return err

View File

@ -108,6 +108,7 @@ type Sealing struct {
upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{}
available map[abi.SectorID]struct{}
notifee SectorStateNotifee
addrSel AddrSel
@ -129,11 +130,11 @@ type openSector struct {
maybeAccept func(cid.Cid) error // called with inputLk
}
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi.SectorNumber) (abi.ChainEpoch, error)) (bool, error) {
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) {
if !o.ccUpdate {
return true, nil
}
expiration, err := expF(o.number)
expiration, _, err := expF(o.number)
if err != nil {
return false, err
}
@ -179,6 +180,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
assignedPieces: map[abi.SectorID][]cid.Cid{},
toUpgrade: map[abi.SectorNumber]struct{}{},
available: map[abi.SectorID]struct{}{},
notifee: notifee,
addrSel: as,

View File

@ -25,6 +25,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
CommitAggregateWait: {},
FinalizeSector: {},
Proving: {},
Available: {},
FailedUnrecoverable: {},
SealPreCommit1Failed: {},
SealPreCommit2Failed: {},
@ -98,6 +99,7 @@ const (
FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving"
Available SectorState = "Available" // proving CC available for SnapDeals
// snap deals / cc update
SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals"
@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving
}
return sstSealing
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
case Proving, Available, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving
}

View File

@ -130,6 +130,11 @@ func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) er
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration
m.inputLk.Lock()
// in case we revert into Proving without going into Available
delete(m.available, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting sealing config: %w", err)
@ -144,3 +149,13 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf
return nil
}
func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{}
m.inputLk.Unlock()
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}

View File

@ -50,17 +50,6 @@ func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
}
func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
curStaging := m.stats.curStaging()
if cfg.MaxWaitDealsSectors > 0 && curStaging >= cfg.MaxWaitDealsSectors {
return xerrors.Errorf("already waiting for deals in %d >= %d (cfg.MaxWaitDealsSectors) sectors, no free resources to wait for deals in another",
curStaging, cfg.MaxWaitDealsSectors)
}
si, err := m.GetSectorInfo(id)
if err != nil {
return xerrors.Errorf("getting sector info: %w", err)
@ -100,7 +89,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
"Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration)
}
return m.sectors.Send(uint64(id), SectorStartCCUpdate{})
return m.sectors.Send(uint64(id), SectorMarkForUpdate{})
}
func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, tok TipSetToken, sector abi.SectorNumber) (bool, error) {

View File

@ -15,5 +15,6 @@ func QuietMiningLogs() {
_ = logging.SetLogLevel("storageminer", "ERROR")
_ = logging.SetLogLevel("pubsub", "ERROR")
_ = logging.SetLogLevel("gen", "ERROR")
_ = logging.SetLogLevel("rpc", "ERROR")
_ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR")
}