Merge pull request #8333 from filecoin-project/backport/v1.15.1/feat/snap-queue

backport: v1.15.1: feat: sealing: Sector upgrade queue
This commit is contained in:
Łukasz Magiera 2022-03-16 22:50:33 +01:00 committed by GitHub
commit 114cb4e0d5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 291 additions and 117 deletions

View File

@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{}
var stateList = []stateMeta{ var stateList = []stateMeta{
{col: 39, state: "Total"}, {col: 39, state: "Total"},
{col: color.FgGreen, state: sealing.Proving}, {col: color.FgGreen, state: sealing.Proving},
{col: color.FgGreen, state: sealing.Available},
{col: color.FgGreen, state: sealing.UpdateActivating}, {col: color.FgGreen, state: sealing.UpdateActivating},
{col: color.FgBlue, state: sealing.Empty}, {col: color.FgBlue, state: sealing.Empty},

View File

@ -351,7 +351,7 @@ var sectorsListCmd = &cli.Command{
if cctx.Bool("unproven") { if cctx.Bool("unproven") {
for state := range sealing.ExistSectorStateList { for state := range sealing.ExistSectorStateList {
if state == sealing.Proving { if state == sealing.Proving || state == sealing.Available {
continue continue
} }
states = append(states, api.SectorState(state)) states = append(states, api.SectorState(state))

View File

@ -365,6 +365,12 @@
# env var: LOTUS_SEALING_FINALIZEEARLY # env var: LOTUS_SEALING_FINALIZEEARLY
#FinalizeEarly = false #FinalizeEarly = false
# After sealing CC sectors, make them available for upgrading with deals
#
# type: bool
# env var: LOTUS_SEALING_MAKECCSECTORSAVAILABLE
#MakeCCSectorsAvailable = false
# Whether to use available miner balance for sector collateral instead of sending it with each message # Whether to use available miner balance for sector collateral instead of sending it with each message
# #
# type: bool # type: bool

View File

@ -111,6 +111,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Committing: planCommitting, Committing: planCommitting,
CommitFinalize: planOne( CommitFinalize: planOne(
on(SectorFinalized{}, SubmitCommit), on(SectorFinalized{}, SubmitCommit),
on(SectorFinalizedAvailable{}, SubmitCommit),
on(SectorFinalizeFailed{}, CommitFinalizeFailed), on(SectorFinalizeFailed{}, CommitFinalizeFailed),
), ),
SubmitCommit: planOne( SubmitCommit: planOne(
@ -136,6 +137,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
FinalizeSector: planOne( FinalizeSector: planOne(
on(SectorFinalized{}, Proving), on(SectorFinalized{}, Proving),
on(SectorFinalizedAvailable{}, Available),
on(SectorFinalizeFailed{}, FinalizeFailed), on(SectorFinalizeFailed{}, FinalizeFailed),
), ),
@ -283,7 +285,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
Proving: planOne( Proving: planOne(
on(SectorFaultReported{}, FaultReported), on(SectorFaultReported{}, FaultReported),
on(SectorFaulty{}, Faulty), on(SectorFaulty{}, Faulty),
on(SectorMarkForUpdate{}, Available),
),
Available: planOne(
on(SectorStartCCUpdate{}, SnapDealsWaitDeals), on(SectorStartCCUpdate{}, SnapDealsWaitDeals),
on(SectorAbortUpgrade{}, Proving),
), ),
Terminating: planOne( Terminating: planOne(
on(SectorTerminating{}, TerminateWait), on(SectorTerminating{}, TerminateWait),
@ -558,6 +564,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
// Post-seal // Post-seal
case Proving: case Proving:
return m.handleProvingSector, processed, nil return m.handleProvingSector, processed, nil
case Available:
return m.handleAvailableSector, processed, nil
case Terminating: case Terminating:
return m.handleTerminating, processed, nil return m.handleTerminating, processed, nil
case TerminateWait: case TerminateWait:

View File

@ -286,6 +286,10 @@ type SectorFinalized struct{}
func (evt SectorFinalized) apply(*SectorInfo) {} func (evt SectorFinalized) apply(*SectorInfo) {}
type SectorFinalizedAvailable struct{}
func (evt SectorFinalizedAvailable) apply(*SectorInfo) {}
type SectorRetryFinalize struct{} type SectorRetryFinalize struct{}
func (evt SectorRetryFinalize) apply(*SectorInfo) {} func (evt SectorRetryFinalize) apply(*SectorInfo) {}
@ -297,6 +301,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {}
// Snap deals // CC update path // Snap deals // CC update path
type SectorMarkForUpdate struct{}
func (evt SectorMarkForUpdate) apply(state *SectorInfo) {}
type SectorStartCCUpdate struct{} type SectorStartCCUpdate struct{}
func (evt SectorStartCCUpdate) apply(state *SectorInfo) { func (evt SectorStartCCUpdate) apply(state *SectorInfo) {

View File

@ -12,11 +12,14 @@ import (
"github.com/filecoin-project/go-commp-utils/zerocomm" "github.com/filecoin-project/go-commp-utils/zerocomm"
"github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-padreader"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/go-statemachine"
"github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/specs-storage/storage"
"github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/build"
"github.com/filecoin-project/lotus/chain/actors/policy"
"github.com/filecoin-project/lotus/chain/types"
sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage"
"github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper"
"github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface"
@ -30,8 +33,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
m.inputLk.Lock() m.inputLk.Lock()
if m.creating != nil && *m.creating == sector.SectorNumber { if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
m.creating = nil m.nextDealSector = nil
} }
sid := m.minerSectorID(sector.SectorNumber) sid := m.minerSectorID(sector.SectorNumber)
@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error {
return m.updateInput(ctx, sp) return m.updateInput(ctx, sp)
} }
type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)
// called with m.inputLk // called with m.inputLk
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
memo := make(map[abi.SectorNumber]struct {
e abi.ChainEpoch
p abi.TokenAmount
})
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok {
return e.e, e.p, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, big.Zero(), err
}
memo[sn] = struct {
e abi.ChainEpoch
p abi.TokenAmount
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
}
ssize, err := sp.SectorSize() ssize, err := sp.SectorSize()
if err != nil { if err != nil {
return err return err
@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
toAssign[proposalCid] = struct{}{} toAssign[proposalCid] = struct{}{}
memo := make(map[abi.SectorNumber]abi.ChainEpoch)
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) {
if exp, ok := memo[sn]; ok {
return exp, nil
}
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{})
if err != nil {
return 0, err
}
memo[sn] = onChainInfo.Expiration
return onChainInfo.Expiration, nil
}
for id, sector := range m.openSectors { for id, sector := range m.openSectors {
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain // check that sector lifetime is long enough to fit deal using latest expiration from on chain
@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
continue continue
} }
if !ok { if !ok {
exp, _ := expF(sector.number) exp, _, _ := expF(sector.number)
log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch) log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue continue
} }
@ -497,7 +508,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
if len(toAssign) > 0 { if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors) log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryCreateDealSector(ctx, sp); err != nil { if err := m.tryGetDealSector(ctx, sp, expF); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err) log.Errorw("Failed to create a new sector for deals", "error", err)
} }
} }
@ -505,10 +516,113 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
return nil return nil
} }
func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) {
var candidates []*pendingPiece
for _, piece := range m.pendingPieces {
if piece.assigned {
continue // already assigned to a sector, skip
}
candidates = append(candidates, piece)
}
// earliest expiration first
sort.Slice(candidates, func(i, j int) bool {
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
})
var totalBytes uint64
for _, candidate := range candidates {
totalBytes += uint64(candidate.size)
if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil
}
}
_, curEpoch, err := m.Api.ChainHead(ctx)
if err != nil {
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
}
minDur, maxDur := policy.DealDurationBounds(0)
return curEpoch + minDur, curEpoch + maxDur, nil
}
func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) {
if len(m.available) == 0 {
return false, nil
}
ssize, err := sp.SectorSize()
if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err)
}
minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize)
if err != nil {
return false, xerrors.Errorf("calculating min target expiration: %w", err)
}
var candidate abi.SectorID
var bestExpiration abi.ChainEpoch
bestPledge := types.TotalFilecoinInt
for s := range m.available {
expiration, pledge, err := ef(s.Number)
if err != nil {
log.Errorw("checking sector expiration", "error", err)
continue
}
slowChecks := func(sid abi.SectorNumber) bool {
active, err := m.sectorActive(ctx, TipSetToken{}, sid)
if err != nil {
log.Errorw("checking sector active", "error", err)
return false
}
if !active {
log.Debugw("skipping available sector", "reason", "not active")
return false
}
return true
}
// if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target
if bestExpiration < targetExpiration {
if expiration > bestExpiration && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
continue
}
if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
bestExpiration = expiration
bestPledge = pledge
candidate = s
}
}
if bestExpiration < minExpiration {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate)
// didn't find a good sector / no sectors were available
return false, nil
}
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
delete(m.available, candidate)
m.nextDealSector = &candidate.Number
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
}
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
m.startupWait.Wait() m.startupWait.Wait()
if m.creating != nil { if m.nextDealSector != nil {
return nil // new sector is being created right now return nil // new sector is being created right now
} }
@ -517,10 +631,6 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return xerrors.Errorf("getting storage config: %w", err) return xerrors.Errorf("getting storage config: %w", err)
} }
if !cfg.MakeNewSectorForDeals {
return nil
}
if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals {
return nil return nil
} }
@ -529,12 +639,24 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal
return nil return nil
} }
got, err := m.tryGetUpgradeSector(ctx, sp, ef)
if err != nil {
return err
}
if got {
return nil
}
if !cfg.MakeNewSectorForDeals {
return nil
}
sid, err := m.createSector(ctx, cfg, sp) sid, err := m.createSector(ctx, cfg, sp)
if err != nil { if err != nil {
return err return err
} }
m.creating = &sid m.nextDealSector = &sid
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
return m.sectors.Send(uint64(sid), SectorStart{ return m.sectors.Send(uint64(sid), SectorStart{
@ -573,6 +695,11 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error {
func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error {
m.startupWait.Wait() m.startupWait.Wait()
m.inputLk.Lock()
// always do this early
delete(m.available, m.minerSectorID(sid))
m.inputLk.Unlock()
log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user") log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
} }

View File

@ -9,6 +9,7 @@ import (
reflect "reflect" reflect "reflect"
address "github.com/filecoin-project/go-address" address "github.com/filecoin-project/go-address"
bitfield "github.com/filecoin-project/go-bitfield"
abi "github.com/filecoin-project/go-state-types/abi" abi "github.com/filecoin-project/go-state-types/abi"
big "github.com/filecoin-project/go-state-types/big" big "github.com/filecoin-project/go-state-types/big"
crypto "github.com/filecoin-project/go-state-types/crypto" crypto "github.com/filecoin-project/go-state-types/crypto"
@ -214,10 +215,10 @@ func (mr *MockSealingAPIMockRecorder) StateMarketStorageDealProposal(arg0, arg1,
} }
// StateMinerActiveSectors mocks base method. // StateMinerActiveSectors mocks base method.
func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (bitfield.BitField, error) {
m.ctrl.T.Helper() m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "StateMinerActiveSectors", arg0, arg1, arg2) ret := m.ctrl.Call(m, "StateMinerActiveSectors", arg0, arg1, arg2)
ret0, _ := ret[0].([]*miner.SectorOnChainInfo) ret0, _ := ret[0].(bitfield.BitField)
ret1, _ := ret[1].(error) ret1, _ := ret[1].(error)
return ret0, ret1 return ret0, ret1
} }

View File

@ -20,6 +20,8 @@ type Config struct {
MakeNewSectorForDeals bool MakeNewSectorForDeals bool
MakeCCSectorsAvailable bool
WaitDealsDelay time.Duration WaitDealsDelay time.Duration
CommittedCapacitySectorLifetime time.Duration CommittedCapacitySectorLifetime time.Duration

View File

@ -13,6 +13,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -63,7 +64,7 @@ type SealingAPI interface {
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error) StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error)
StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error)
StateMinerActiveSectors(context.Context, address.Address, TipSetToken) ([]*miner.SectorOnChainInfo, error) StateMinerActiveSectors(context.Context, address.Address, TipSetToken) (bitfield.BitField, error)
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error)
StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error)
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
@ -104,10 +105,11 @@ type Sealing struct {
sectorTimers map[abi.SectorID]*time.Timer sectorTimers map[abi.SectorID]*time.Timer
pendingPieces map[cid.Cid]*pendingPiece pendingPieces map[cid.Cid]*pendingPiece
assignedPieces map[abi.SectorID][]cid.Cid assignedPieces map[abi.SectorID][]cid.Cid
creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once nextDealSector *abi.SectorNumber // used to prevent a race where we could create a new sector more than once
upgradeLk sync.Mutex upgradeLk sync.Mutex
toUpgrade map[abi.SectorNumber]struct{} toUpgrade map[abi.SectorNumber]struct{}
available map[abi.SectorID]struct{}
notifee SectorStateNotifee notifee SectorStateNotifee
addrSel AddrSel addrSel AddrSel
@ -129,11 +131,11 @@ type openSector struct {
maybeAccept func(cid.Cid) error // called with inputLk maybeAccept func(cid.Cid) error // called with inputLk
} }
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi.SectorNumber) (abi.ChainEpoch, error)) (bool, error) { func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) {
if !o.ccUpdate { if !o.ccUpdate {
return true, nil return true, nil
} }
expiration, err := expF(o.number) expiration, _, err := expF(o.number)
if err != nil { if err != nil {
return false, err return false, err
} }
@ -179,6 +181,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events
assignedPieces: map[abi.SectorID][]cid.Cid{}, assignedPieces: map[abi.SectorID][]cid.Cid{},
toUpgrade: map[abi.SectorNumber]struct{}{}, toUpgrade: map[abi.SectorNumber]struct{}{},
available: map[abi.SectorID]struct{}{},
notifee: notifee, notifee: notifee,
addrSel: as, addrSel: as,

View File

@ -25,6 +25,7 @@ var ExistSectorStateList = map[SectorState]struct{}{
CommitAggregateWait: {}, CommitAggregateWait: {},
FinalizeSector: {}, FinalizeSector: {},
Proving: {}, Proving: {},
Available: {},
FailedUnrecoverable: {}, FailedUnrecoverable: {},
SealPreCommit1Failed: {}, SealPreCommit1Failed: {},
SealPreCommit2Failed: {}, SealPreCommit2Failed: {},
@ -98,6 +99,7 @@ const (
FinalizeSector SectorState = "FinalizeSector" FinalizeSector SectorState = "FinalizeSector"
Proving SectorState = "Proving" Proving SectorState = "Proving"
Available SectorState = "Available" // proving CC available for SnapDeals
// snap deals / cc update // snap deals / cc update
SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals" SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals"
@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState {
return sstProving return sstProving
} }
return sstSealing return sstSealing
case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: case Proving, Available, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
return sstProving return sstProving
} }

View File

@ -238,7 +238,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect
} }
// Abort upgrade for sectors that went faulty since being marked for upgrade // Abort upgrade for sectors that went faulty since being marked for upgrade
active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
if err != nil { if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err) log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil return nil

View File

@ -130,6 +130,11 @@ func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) er
func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error {
// TODO: track sector health / expiration // TODO: track sector health / expiration
m.inputLk.Lock()
// in case we revert into Proving without going into Available
delete(m.available, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock()
cfg, err := m.getConfig() cfg, err := m.getConfig()
if err != nil { if err != nil {
return xerrors.Errorf("getting sealing config: %w", err) return xerrors.Errorf("getting sealing config: %w", err)
@ -144,3 +149,13 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf
return nil return nil
} }
func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error {
m.inputLk.Lock()
m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{}
m.inputLk.Unlock()
// TODO: Watch termination
// TODO: Auto-extend if set
return nil
}

View File

@ -41,7 +41,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect
log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err) log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err)
return nil return nil
} }
active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber)
if err != nil { if err != nil {
log.Errorf("sector active check: api error, not proceeding: %+v", err) log.Errorf("sector active check: api error, not proceeding: %+v", err)
return nil return nil

View File

@ -782,5 +782,8 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn
return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)})
} }
if cfg.MakeCCSectorsAvailable && !sector.hasDeals() {
return ctx.Send(SectorFinalizedAvailable{})
}
return ctx.Send(SectorFinalized{}) return ctx.Send(SectorFinalized{})
} }

View File

@ -3,14 +3,13 @@ package sealing
import ( import (
"context" "context"
"github.com/filecoin-project/go-address"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market"
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market"
) )
func (m *Sealing) IsMarkedForUpgrade(id abi.SectorNumber) bool { func (m *Sealing) IsMarkedForUpgrade(id abi.SectorNumber) bool {
@ -50,17 +49,6 @@ func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
} }
func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error { func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error {
cfg, err := m.getConfig()
if err != nil {
return xerrors.Errorf("getting storage config: %w", err)
}
curStaging := m.stats.curStaging()
if cfg.MaxWaitDealsSectors > 0 && curStaging >= cfg.MaxWaitDealsSectors {
return xerrors.Errorf("already waiting for deals in %d >= %d (cfg.MaxWaitDealsSectors) sectors, no free resources to wait for deals in another",
curStaging, cfg.MaxWaitDealsSectors)
}
si, err := m.GetSectorInfo(id) si, err := m.GetSectorInfo(id)
if err != nil { if err != nil {
return xerrors.Errorf("getting sector info: %w", err) return xerrors.Errorf("getting sector info: %w", err)
@ -70,11 +58,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade") return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade")
} }
if len(si.Pieces) != 1 { if si.hasDeals() {
return xerrors.Errorf("not a committed-capacity sector, expected 1 piece")
}
if si.Pieces[0].DealInfo != nil {
return xerrors.Errorf("not a committed-capacity sector, has deals") return xerrors.Errorf("not a committed-capacity sector, has deals")
} }
@ -87,7 +71,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
return xerrors.Errorf("failed to read sector on chain info: %w", err) return xerrors.Errorf("failed to read sector on chain info: %w", err)
} }
active, err := sectorActive(ctx, m.Api, m.maddr, tok, id) active, err := m.sectorActive(ctx, tok, id)
if err != nil { if err != nil {
return xerrors.Errorf("failed to check if sector is active") return xerrors.Errorf("failed to check if sector is active")
} }
@ -100,24 +84,16 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e
"Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration) "Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration)
} }
return m.sectors.Send(uint64(id), SectorStartCCUpdate{}) return m.sectors.Send(uint64(id), SectorMarkForUpdate{})
} }
func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, tok TipSetToken, sector abi.SectorNumber) (bool, error) { func (m *Sealing) sectorActive(ctx context.Context, tok TipSetToken, sector abi.SectorNumber) (bool, error) {
active, err := api.StateMinerActiveSectors(ctx, maddr, tok) active, err := m.Api.StateMinerActiveSectors(ctx, m.maddr, tok)
if err != nil { if err != nil {
return false, xerrors.Errorf("failed to check active sectors: %w", err) return false, xerrors.Errorf("failed to check active sectors: %w", err)
} }
// Ensure the upgraded sector is active return active.IsSet(uint64(sector))
var found bool
for _, si := range active {
if si.SectorNumber == sector {
found = true
break
}
}
return found, nil
} }
func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int { func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int {

View File

@ -10,12 +10,14 @@ import (
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-state-types/network"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
"github.com/filecoin-project/lotus/itests/kit"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require" "github.com/stretchr/testify/require"
"github.com/filecoin-project/lotus/api"
"github.com/filecoin-project/lotus/chain/types"
sealing "github.com/filecoin-project/lotus/extern/storage-sealing"
"github.com/filecoin-project/lotus/itests/kit"
) )
func TestCCUpgrade(t *testing.T) { func TestCCUpgrade(t *testing.T) {
@ -48,9 +50,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
fmt.Printf("CCUpgrade: %d\n", CCUpgrade) fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
// wait for deadline 0 to pass so that committing starts after post on preseals
// this gives max time for post to complete minimizing chances of timeout
// waitForDeadline(ctx, t, 1, client, maddr)
miner.PledgeSectors(ctx, 1, 0, nil) miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx) sl, err := miner.SectorsList(ctx)
require.NoError(t, err) require.NoError(t, err)
@ -89,18 +88,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode {
return client return client
} }
func waitForDeadline(ctx context.Context, t *testing.T, waitIdx uint64, node *kit.TestFullNode, maddr address.Address) {
for {
ts, err := node.ChainHead(ctx)
require.NoError(t, err)
dl, err := node.StateMinerProvingDeadline(ctx, maddr, ts.Key())
require.NoError(t, err)
if dl.Index == waitIdx {
return
}
}
}
func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) { func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) {
for { for {
active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK)
@ -116,18 +103,6 @@ func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber,
} }
} }
func waitForSectorStartUpgrade(ctx context.Context, t *testing.T, sn abi.SectorNumber, miner *kit.TestMiner) {
for {
si, err := miner.StorageMiner.SectorsStatus(ctx, sn, false)
require.NoError(t, err)
if si.State != api.SectorState("Proving") {
t.Logf("Done proving sector in state: %s", si.State)
return
}
}
}
func TestCCUpgradeAndPoSt(t *testing.T) { func TestCCUpgradeAndPoSt(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
t.Run("upgrade and then post", func(t *testing.T) { t.Run("upgrade and then post", func(t *testing.T) {
@ -148,13 +123,13 @@ func TestCCUpgradeAndPoSt(t *testing.T) {
}) })
} }
func TestTooManyMarkedForUpgrade(t *testing.T) { func TestAbortUpgradeAvailable(t *testing.T) {
kit.QuietMiningLogs() kit.QuietMiningLogs()
ctx := context.Background() ctx := context.Background()
blockTime := 1 * time.Millisecond blockTime := 1 * time.Millisecond
client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15)) client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC())
ens.InterconnectAll().BeginMiningMustPost(blockTime) ens.InterconnectAll().BeginMiningMustPost(blockTime)
maddr, err := miner.ActorAddress(ctx) maddr, err := miner.ActorAddress(ctx)
@ -163,32 +138,53 @@ func TestTooManyMarkedForUpgrade(t *testing.T) {
} }
CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1)
waitForDeadline(ctx, t, 1, client, maddr) fmt.Printf("CCUpgrade: %d\n", CCUpgrade)
miner.PledgeSectors(ctx, 3, 0, nil)
miner.PledgeSectors(ctx, 1, 0, nil)
sl, err := miner.SectorsList(ctx) sl, err := miner.SectorsList(ctx)
require.NoError(t, err) require.NoError(t, err)
require.Len(t, sl, 3, "expected 3 sectors") require.Len(t, sl, 1, "expected 1 sector")
require.Equal(t, CCUpgrade, sl[0], "unexpected sector number")
{ {
si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK)
require.NoError(t, err) require.NoError(t, err)
require.Less(t, 50000, int(si.Expiration)) require.Less(t, 50000, int(si.Expiration))
} }
waitForSectorActive(ctx, t, CCUpgrade, client, maddr) waitForSectorActive(ctx, t, CCUpgrade, client, maddr)
waitForSectorActive(ctx, t, CCUpgrade+1, client, maddr)
waitForSectorActive(ctx, t, CCUpgrade+2, client, maddr)
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade, true) err = miner.SectorMarkForUpgrade(ctx, sl[0], true)
require.NoError(t, err)
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+1, true)
require.NoError(t, err) require.NoError(t, err)
waitForSectorStartUpgrade(ctx, t, CCUpgrade, miner) sl, err = miner.SectorsList(ctx)
waitForSectorStartUpgrade(ctx, t, CCUpgrade+1, miner) require.NoError(t, err)
require.Len(t, sl, 1, "expected 1 sector")
err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+2, true) ss, err := miner.SectorsStatus(ctx, sl[0], false)
require.Error(t, err) require.NoError(t, err)
assert.Contains(t, err.Error(), "no free resources to wait for deals")
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Proving) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Available), ss.State)
break
}
require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0]))
for i := 0; i < 100; i++ {
ss, err = miner.SectorsStatus(ctx, sl[0], false)
require.NoError(t, err)
if ss.State == api.SectorState(sealing.Available) {
time.Sleep(50 * time.Millisecond)
continue
}
require.Equal(t, api.SectorState(sealing.Proving), ss.State)
break
}
} }

View File

@ -15,5 +15,6 @@ func QuietMiningLogs() {
_ = logging.SetLogLevel("storageminer", "ERROR") _ = logging.SetLogLevel("storageminer", "ERROR")
_ = logging.SetLogLevel("pubsub", "ERROR") _ = logging.SetLogLevel("pubsub", "ERROR")
_ = logging.SetLogLevel("gen", "ERROR") _ = logging.SetLogLevel("gen", "ERROR")
_ = logging.SetLogLevel("rpc", "ERROR")
_ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR") _ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR")
} }

View File

@ -750,6 +750,12 @@ avoid the relatively high cost of unsealing the data later, at the cost of more
Comment: `Run sector finalization before submitting sector proof to the chain`, Comment: `Run sector finalization before submitting sector proof to the chain`,
}, },
{
Name: "MakeCCSectorsAvailable",
Type: "bool",
Comment: `After sealing CC sectors, make them available for upgrading with deals`,
},
{ {
Name: "CollateralFromMinerBalance", Name: "CollateralFromMinerBalance",
Type: "bool", Type: "bool",

View File

@ -250,6 +250,9 @@ type SealingConfig struct {
// Run sector finalization before submitting sector proof to the chain // Run sector finalization before submitting sector proof to the chain
FinalizeEarly bool FinalizeEarly bool
// After sealing CC sectors, make them available for upgrading with deals
MakeCCSectorsAvailable bool
// Whether to use available miner balance for sector collateral instead of sending it with each message // Whether to use available miner balance for sector collateral instead of sending it with each message
CollateralFromMinerBalance bool CollateralFromMinerBalance bool
// Minimum available balance to keep in the miner actor before sending it with messages // Minimum available balance to keep in the miner actor before sending it with messages

View File

@ -897,6 +897,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly, FinalizeEarly: cfg.FinalizeEarly,
@ -935,6 +936,7 @@ func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config {
MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals, MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals,
CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime),
WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay),
MakeCCSectorsAvailable: cfg.Sealing.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.Sealing.FinalizeEarly, FinalizeEarly: cfg.Sealing.FinalizeEarly,

View File

@ -9,6 +9,7 @@ import (
"golang.org/x/xerrors" "golang.org/x/xerrors"
"github.com/filecoin-project/go-address" "github.com/filecoin-project/go-address"
"github.com/filecoin-project/go-bitfield"
"github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/big"
"github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/crypto"
@ -112,13 +113,25 @@ func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr
return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk) return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk)
} }
func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (bitfield.BitField, error) {
tsk, err := types.TipSetKeyFromBytes(tok) tsk, err := types.TipSetKeyFromBytes(tok)
if err != nil { if err != nil {
return nil, xerrors.Errorf("faile dto unmarshal TipSetToken to TipSetKey: %w", err) return bitfield.BitField{}, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
} }
return s.delegate.StateMinerActiveSectors(ctx, maddr, tsk) act, err := s.delegate.StateGetActor(ctx, maddr, tsk)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("getting miner actor: temp error: %+v", err)
}
stor := store.ActorStore(ctx, blockstore.NewAPIBlockstore(s.delegate))
state, err := miner.Load(stor, act)
if err != nil {
return bitfield.BitField{}, xerrors.Errorf("loading miner state: %+v", err)
}
return miner.AllPartSectors(state, miner.Partition.ActiveSectors)
} }
func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) { func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) {