Merge pull request #9310 from filecoin-project/fix/empty-snap

fix: sealing: Abort upgrades in sectors with no deals
This commit is contained in:
Jiaying Wang 2022-09-16 21:21:39 -04:00 committed by GitHub
commit 4abc38dacc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 145 additions and 36 deletions

View File

@ -442,6 +442,30 @@
# env var: LOTUS_SEALING_MAXUPGRADINGSECTORS # env var: LOTUS_SEALING_MAXUPGRADINGSECTORS
#MaxUpgradingSectors = 0 #MaxUpgradingSectors = 0
# When set to a non-zero value, minimum number of epochs until sector expiration required for sectors to be considered
# for upgrades (0 = DealMinDuration = 180 days = 518400 epochs)
#
# Note that if all deals waiting in the input queue have lifetimes longer than this value, upgrade sectors will be
# required to have expiration of at least the soonest-ending deal
#
# type: uint64
# env var: LOTUS_SEALING_MINUPGRADESECTOREXPIRATION
#MinUpgradeSectorExpiration = 0
# When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
# be selected based on lowest initial pledge.
#
# Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
# selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
# where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
#
# Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
# initial pledge - upgrade sectors will always be chosen based on longest expiration
#
# type: uint64
# env var: LOTUS_SEALING_MINTARGETUPGRADESECTOREXPIRATION
#MinTargetUpgradeSectorExpiration = 0
# CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will # CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
# live before it must be extended or converted into sector containing deals before it is # live before it must be extended or converted into sector containing deals before it is
# terminated. Value must be between 180-540 days inclusive # terminated. Value must be between 180-540 days inclusive

View File

@ -922,6 +922,30 @@ flow when the volume of storage deals is lower.`,
Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)`, Comment: `Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)`,
}, },
{
Name: "MinUpgradeSectorExpiration",
Type: "uint64",
Comment: `When set to a non-zero value, minimum number of epochs until sector expiration required for sectors to be considered
for upgrades (0 = DealMinDuration = 180 days = 518400 epochs)
Note that if all deals waiting in the input queue have lifetimes longer than this value, upgrade sectors will be
required to have expiration of at least the soonest-ending deal`,
},
{
Name: "MinTargetUpgradeSectorExpiration",
Type: "uint64",
Comment: `When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
be selected based on lowest initial pledge.
Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
initial pledge - upgrade sectors will always be chosen based on longest expiration`,
},
{ {
Name: "CommittedCapacitySectorLifetime", Name: "CommittedCapacitySectorLifetime",
Type: "Duration", Type: "Duration",

View File

@ -319,6 +319,24 @@ type SealingConfig struct {
// Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals) // Upper bound on how many sectors can be sealing+upgrading at the same time when upgrading CC sectors with deals (0 = MaxSealingSectorsForDeals)
MaxUpgradingSectors uint64 MaxUpgradingSectors uint64
// When set to a non-zero value, minimum number of epochs until sector expiration required for sectors to be considered
// for upgrades (0 = DealMinDuration = 180 days = 518400 epochs)
//
// Note that if all deals waiting in the input queue have lifetimes longer than this value, upgrade sectors will be
// required to have expiration of at least the soonest-ending deal
MinUpgradeSectorExpiration uint64
// When set to a non-zero value, minimum number of epochs until sector expiration above which upgrade candidates will
// be selected based on lowest initial pledge.
//
// Target sector expiration is calculated by looking at the input deal queue, sorting it by deal expiration, and
// selecting N deals from the queue up to sector size. The target expiration will be Nth deal end epoch, or in case
// where there weren't enough deals to fill a sector, DealMaxDuration (540 days = 1555200 epochs)
//
// Setting this to a high value (for example to maximum deal duration - 1555200) will disable selection based on
// initial pledge - upgrade sectors will always be chosen based on longest expiration
MinTargetUpgradeSectorExpiration uint64
// CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will // CommittedCapacitySectorLifetime is the duration a Committed Capacity (CC) sector will
// live before it must be extended or converted into sector containing deals before it is // live before it must be extended or converted into sector containing deals before it is
// terminated. Value must be between 180-540 days inclusive // terminated. Value must be between 180-540 days inclusive

View File

@ -983,17 +983,19 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
return func(cfg sealiface.Config) (err error) { return func(cfg sealiface.Config) (err error) {
err = mutateSealingCfg(r, func(c config.SealingConfiger) { err = mutateSealingCfg(r, func(c config.SealingConfiger) {
newCfg := config.SealingConfig{ newCfg := config.SealingConfig{
MaxWaitDealsSectors: cfg.MaxWaitDealsSectors, MaxWaitDealsSectors: cfg.MaxWaitDealsSectors,
MaxSealingSectors: cfg.MaxSealingSectors, MaxSealingSectors: cfg.MaxSealingSectors,
MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals, PreferNewSectorsForDeals: cfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: cfg.MaxUpgradingSectors, MaxUpgradingSectors: cfg.MaxUpgradingSectors,
CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime),
WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay),
MakeNewSectorForDeals: cfg.MakeNewSectorForDeals, MakeNewSectorForDeals: cfg.MakeNewSectorForDeals,
MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, MinUpgradeSectorExpiration: cfg.MinUpgradeSectorExpiration,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, MinTargetUpgradeSectorExpiration: cfg.MinTargetUpgradeSectorExpiration,
FinalizeEarly: cfg.FinalizeEarly, MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable,
AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy,
FinalizeEarly: cfg.FinalizeEarly,
CollateralFromMinerBalance: cfg.CollateralFromMinerBalance, CollateralFromMinerBalance: cfg.CollateralFromMinerBalance,
AvailableBalanceBuffer: types.FIL(cfg.AvailableBalanceBuffer), AvailableBalanceBuffer: types.FIL(cfg.AvailableBalanceBuffer),
@ -1024,11 +1026,14 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error
func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.SealingConfig) sealiface.Config { func ToSealingConfig(dealmakingCfg config.DealmakingConfig, sealingCfg config.SealingConfig) sealiface.Config {
return sealiface.Config{ return sealiface.Config{
MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors, MaxWaitDealsSectors: sealingCfg.MaxWaitDealsSectors,
MaxSealingSectors: sealingCfg.MaxSealingSectors, MaxSealingSectors: sealingCfg.MaxSealingSectors,
MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals, MaxSealingSectorsForDeals: sealingCfg.MaxSealingSectorsForDeals,
PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals, PreferNewSectorsForDeals: sealingCfg.PreferNewSectorsForDeals,
MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors, MinUpgradeSectorExpiration: sealingCfg.MinUpgradeSectorExpiration,
MinTargetUpgradeSectorExpiration: sealingCfg.MinTargetUpgradeSectorExpiration,
MaxUpgradingSectors: sealingCfg.MaxUpgradingSectors,
StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer), StartEpochSealingBuffer: abi.ChainEpoch(dealmakingCfg.StartEpochSealingBuffer),
MakeNewSectorForDeals: sealingCfg.MakeNewSectorForDeals, MakeNewSectorForDeals: sealingCfg.MakeNewSectorForDeals,
CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime), CommittedCapacitySectorLifetime: time.Duration(sealingCfg.CommittedCapacitySectorLifetime),

View File

@ -396,7 +396,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
e abi.ChainEpoch e abi.ChainEpoch
p abi.TokenAmount p abi.TokenAmount
}) })
expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) { getExpirationCached := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
if e, ok := memo[sn]; ok { if e, ok := memo[sn]; ok {
return e.e, e.p, nil return e.e, e.p, nil
} }
@ -440,13 +440,13 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
// check that sector lifetime is long enough to fit deal using latest expiration from on chain // check that sector lifetime is long enough to fit deal using latest expiration from on chain
ok, err := sector.dealFitsInLifetime(piece.deal.DealProposal.EndEpoch, expF) ok, err := sector.dealFitsInLifetime(piece.deal.DealProposal.EndEpoch, getExpirationCached)
if err != nil { if err != nil {
log.Errorf("failed to check expiration for cc Update sector %d", sector.number) log.Errorf("failed to check expiration for cc Update sector %d", sector.number)
continue continue
} }
if !ok { if !ok {
exp, _, _ := expF(sector.number) exp, _, _ := getExpirationCached(sector.number)
log.Debugf("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch) log.Debugf("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
continue continue
} }
@ -513,7 +513,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
if len(toAssign) > 0 { if len(toAssign) > 0 {
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors) log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
if err := m.tryGetDealSector(ctx, sp, expF); err != nil { if err := m.tryGetDealSector(ctx, sp, getExpirationCached); err != nil {
log.Errorw("Failed to create a new sector for deals", "error", err) log.Errorw("Failed to create a new sector for deals", "error", err)
} }
} }
@ -521,7 +521,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
return nil return nil
} }
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) { func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize, cfg sealiface.Config) (minExpEpoch, targetEpoch abi.ChainEpoch, err error) {
var candidates []*pendingPiece var candidates []*pendingPiece
for _, piece := range m.pendingPieces { for _, piece := range m.pendingPieces {
@ -537,11 +537,16 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
}) })
var totalBytes uint64 var totalBytes uint64
var full bool
// Find the expiration of the last deal which can fit into the sector, use that as the initial target
for _, candidate := range candidates { for _, candidate := range candidates {
totalBytes += uint64(candidate.size) totalBytes += uint64(candidate.size)
targetEpoch = candidate.deal.DealProposal.EndEpoch
if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) { if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil full = true
break
} }
} }
@ -550,12 +555,31 @@ func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize
return 0, 0, xerrors.Errorf("getting current epoch: %w", err) return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
} }
minDur, maxDur := policy.DealDurationBounds(0) // if the sector isn't full, use max deal duration as the target
if !full {
minDur, maxDur := policy.DealDurationBounds(0)
return ts.Height() + minDur, ts.Height() + maxDur, nil minExpEpoch = ts.Height() + minDur
targetEpoch = ts.Height() + maxDur
}
// make sure that at least one deal in the queue is within the expiration
if len(candidates) > 0 && candidates[0].deal.DealProposal.EndEpoch > minExpEpoch {
minExpEpoch = candidates[0].deal.DealProposal.EndEpoch
}
// apply user minimums
if abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)+ts.Height() > minExpEpoch {
minExpEpoch = abi.ChainEpoch(cfg.MinUpgradeSectorExpiration) + ts.Height()
}
if abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)+ts.Height() > targetEpoch {
targetEpoch = abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration) + ts.Height()
}
return minExpEpoch, targetEpoch, nil
} }
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, cfg sealiface.Config, ef expFn) (bool, error) {
if len(m.available) == 0 { if len(m.available) == 0 {
return false, nil return false, nil
} }
@ -564,7 +588,7 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
if err != nil { if err != nil {
return false, xerrors.Errorf("getting sector size: %w", err) return false, xerrors.Errorf("getting sector size: %w", err)
} }
minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize) minExpirationEpoch, targetExpirationEpoch, err := m.calcTargetExpiration(ctx, ssize, cfg)
if err != nil { if err != nil {
return false, xerrors.Errorf("calculating min target expiration: %w", err) return false, xerrors.Errorf("calculating min target expiration: %w", err)
} }
@ -574,7 +598,7 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
bestPledge := types.TotalFilecoinInt bestPledge := types.TotalFilecoinInt
for s := range m.available { for s := range m.available {
expiration, pledge, err := ef(s.Number) expirationEpoch, pledge, err := ef(s.Number)
if err != nil { if err != nil {
log.Errorw("checking sector expiration", "error", err) log.Errorw("checking sector expiration", "error", err)
continue continue
@ -596,24 +620,24 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
// if best is below target, we want larger expirations // if best is below target, we want larger expirations
// if best is above target, we want lower pledge, but only if still above target // if best is above target, we want lower pledge, but only if still above target
if bestExpiration < targetExpiration { if bestExpiration < targetExpirationEpoch {
if expiration > bestExpiration && slowChecks(s.Number) { if expirationEpoch > bestExpiration && slowChecks(s.Number) {
bestExpiration = expiration bestExpiration = expirationEpoch
bestPledge = pledge bestPledge = pledge
candidate = s candidate = s
} }
continue continue
} }
if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) { if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
bestExpiration = expiration bestExpiration = expirationEpoch
bestPledge = pledge bestPledge = pledge
candidate = s candidate = s
} }
} }
if bestExpiration < minExpiration { if bestExpiration < minExpirationEpoch {
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate) log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpirationEpoch, "min", minExpirationEpoch, "candidate", candidate)
// didn't find a good sector / no sectors were available // didn't find a good sector / no sectors were available
return false, nil return false, nil
} }
@ -682,7 +706,7 @@ func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealPro
"shouldUpgrade", shouldUpgrade) "shouldUpgrade", shouldUpgrade)
if shouldUpgrade { if shouldUpgrade {
got, err := m.maybeUpgradeSector(ctx, sp, ef) got, err := m.maybeUpgradeSector(ctx, sp, cfg, ef)
if err != nil { if err != nil {
return err return err
} }

View File

@ -20,6 +20,10 @@ type Config struct {
PreferNewSectorsForDeals bool PreferNewSectorsForDeals bool
MinUpgradeSectorExpiration uint64
MinTargetUpgradeSectorExpiration uint64
MaxUpgradingSectors uint64 MaxUpgradingSectors uint64
MakeNewSectorForDeals bool MakeNewSectorForDeals bool

View File

@ -21,6 +21,11 @@ import (
) )
func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error { func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error {
// if the sector ended up not having any deals, abort the upgrade
if !sector.hasDeals() {
return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")})
}
if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state
return handleErrors(ctx, err, sector) return handleErrors(ctx, err, sector)
} }
@ -297,6 +302,6 @@ func handleErrors(ctx statemachine.Context, err error, sector SectorInfo) error
case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? case *ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)}) return ctx.Send(SectorDealsExpired{xerrors.Errorf("expired dealIDs in sector: %w", err)})
default: default:
return xerrors.Errorf("checkPieces sanity check error: %w", err) return xerrors.Errorf("checkPieces sanity check error: %w (%+v)", err, err)
} }
} }

View File

@ -49,6 +49,11 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber)) delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
m.inputLk.Unlock() m.inputLk.Unlock()
// if this is a snapdeals sector, but it ended up not having any deals, abort the upgrade
if sector.State == SnapDealsPacking && !sector.hasDeals() {
return ctx.Send(SectorAbortUpgrade{xerrors.New("sector had no deals")})
}
log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber) log.Infow("performing filling up rest of the sector...", "sector", sector.SectorNumber)
var allocated abi.UnpaddedPieceSize var allocated abi.UnpaddedPieceSize