sealing pipeline: Prepare deal assigning logic for FIP-45
This commit is contained in:
parent
752d58906c
commit
0af9888b12
@ -12,6 +12,7 @@ import (
|
||||
"github.com/filecoin-project/go-padreader"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
"github.com/filecoin-project/go-statemachine"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
@ -28,8 +29,13 @@ import (
|
||||
|
||||
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
||||
var used abi.UnpaddedPieceSize
|
||||
var lastDealEnd abi.ChainEpoch
|
||||
for _, piece := range sector.Pieces {
|
||||
used += piece.Piece.Size.Unpadded()
|
||||
|
||||
if piece.DealInfo.DealProposal.EndEpoch > lastDealEnd {
|
||||
lastDealEnd = piece.DealInfo.DealProposal.EndEpoch
|
||||
}
|
||||
}
|
||||
|
||||
m.inputLk.Lock()
|
||||
@ -72,6 +78,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e
|
||||
// (note that m.assignedPieces[sid] will always be empty here)
|
||||
m.openSectors[sid].used = used
|
||||
}
|
||||
m.openSectors[sid].lastDealEnd = lastDealEnd
|
||||
|
||||
go func() {
|
||||
defer m.inputLk.Unlock()
|
||||
@ -314,6 +321,8 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
|
||||
deal.DealProposal.PieceCID, ts.Height(), deal.DealProposal.StartEpoch)
|
||||
}
|
||||
|
||||
claimTerms, err := m.getClaimTerms(ctx, deal)
|
||||
|
||||
m.inputLk.Lock()
|
||||
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
|
||||
m.inputLk.Unlock()
|
||||
@ -332,7 +341,7 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
|
||||
}
|
||||
|
||||
// addPendingPiece takes over m.inputLk
|
||||
pp := m.addPendingPiece(ctx, size, data, deal, sp)
|
||||
pp := m.addPendingPiece(ctx, size, data, deal, claimTerms, sp)
|
||||
|
||||
res, err := waitAddPieceResp(ctx, pp)
|
||||
if err != nil {
|
||||
@ -341,14 +350,25 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec
|
||||
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
|
||||
}
|
||||
|
||||
func (m *Sealing) getClaimTerms(ctx context.Context, deal api.PieceDealInfo) (pieceClaimBounds, error) {
|
||||
// TODO: TODO! TODO! get the real claim bounds here
|
||||
|
||||
return pieceClaimBounds{
|
||||
claimTermEnd: deal.DealProposal.EndEpoch + policy.GetSectorMaxLifetime(abi.RegisteredSealProof_StackedDrg32GiBV1_1, network.Version17),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// called with m.inputLk; transfers the lock to another goroutine!
|
||||
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece {
|
||||
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo, ct pieceClaimBounds, sp abi.RegisteredSealProof) *pendingPiece {
|
||||
doneCh := make(chan struct{})
|
||||
pp := &pendingPiece{
|
||||
size: size,
|
||||
deal: deal,
|
||||
claimTerms: ct,
|
||||
|
||||
data: data,
|
||||
|
||||
doneCh: doneCh,
|
||||
size: size,
|
||||
deal: deal,
|
||||
data: data,
|
||||
assigned: false,
|
||||
}
|
||||
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
|
||||
@ -420,6 +440,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
sector abi.SectorID
|
||||
deal cid.Cid
|
||||
|
||||
dealEnd abi.ChainEpoch
|
||||
claimTermEnd abi.ChainEpoch
|
||||
|
||||
size abi.UnpaddedPieceSize
|
||||
padding abi.UnpaddedPieceSize
|
||||
}
|
||||
@ -440,13 +463,15 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
|
||||
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
|
||||
|
||||
ok, err := sector.dealFitsInLifetime(piece.deal.DealProposal.EndEpoch, getExpirationCached)
|
||||
ok, err := sector.checkDealAssignable(piece, getExpirationCached)
|
||||
if err != nil {
|
||||
log.Errorf("failed to check expiration for cc Update sector %d", sector.number)
|
||||
continue
|
||||
}
|
||||
if !ok {
|
||||
exp, _, _ := getExpirationCached(sector.number)
|
||||
|
||||
// todo move this log into checkDealAssignable, make more detailed about the reason
|
||||
log.Debugf("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
|
||||
continue
|
||||
}
|
||||
@ -456,6 +481,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
sector: id,
|
||||
deal: proposalCid,
|
||||
|
||||
dealEnd: piece.deal.DealProposal.EndEpoch,
|
||||
claimTermEnd: piece.claimTerms.claimTermEnd,
|
||||
|
||||
size: piece.size,
|
||||
padding: avail % piece.size,
|
||||
})
|
||||
@ -463,6 +491,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
}
|
||||
}
|
||||
sort.Slice(matches, func(i, j int) bool {
|
||||
// todo maybe sort by expiration
|
||||
|
||||
if matches[i].padding != matches[j].padding { // less padding is better
|
||||
return matches[i].padding < matches[j].padding
|
||||
}
|
||||
@ -487,18 +517,29 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
continue
|
||||
}
|
||||
|
||||
// late checks
|
||||
|
||||
avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used
|
||||
|
||||
if mt.size > avail {
|
||||
continue
|
||||
}
|
||||
|
||||
if m.openSectors[mt.sector].lastDealEnd > mt.claimTermEnd {
|
||||
continue
|
||||
}
|
||||
|
||||
// assign the piece!
|
||||
|
||||
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
|
||||
if err != nil {
|
||||
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
|
||||
}
|
||||
|
||||
m.openSectors[mt.sector].used += mt.padding + mt.size
|
||||
if mt.dealEnd > m.openSectors[mt.sector].lastDealEnd {
|
||||
m.openSectors[mt.sector].lastDealEnd = mt.dealEnd
|
||||
}
|
||||
|
||||
m.pendingPieces[mt.deal].assigned = true
|
||||
delete(toAssign, mt.deal)
|
||||
@ -521,62 +562,66 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize, cfg sealiface.Config) (minExpEpoch, targetEpoch abi.ChainEpoch, err error) {
|
||||
var candidates []*pendingPiece
|
||||
// pendingPieceIndex is an index in the Sealing.pendingPieces map
|
||||
type pendingPieceIndex cid.Cid
|
||||
|
||||
for _, piece := range m.pendingPieces {
|
||||
if piece.assigned {
|
||||
continue // already assigned to a sector, skip
|
||||
type pieceBound struct {
|
||||
epoch abi.ChainEpoch
|
||||
|
||||
// boundStart marks deal /end/ epoch; only deals with boundStart lower or equal to expiration of a given sector can be
|
||||
// put into that sector
|
||||
boundStart []pendingPieceIndex
|
||||
|
||||
// boundEnd marks deal claim TermMax; only deals with boundEnd higher or equal to expiration of a given sector can be
|
||||
// put into that sector
|
||||
boundEnd []pendingPieceIndex
|
||||
|
||||
dealBytesInBound abi.UnpaddedPieceSize
|
||||
}
|
||||
|
||||
func (m *Sealing) pendingPieceEpochBounds() []pieceBound {
|
||||
boundsByEpoch := map[abi.ChainEpoch]*pieceBound{}
|
||||
|
||||
for ppi, piece := range m.pendingPieces {
|
||||
// start bound on deal end
|
||||
if boundsByEpoch[piece.deal.DealProposal.EndEpoch] == nil {
|
||||
boundsByEpoch[piece.deal.DealProposal.EndEpoch] = &pieceBound{
|
||||
epoch: piece.deal.DealProposal.EndEpoch,
|
||||
}
|
||||
}
|
||||
candidates = append(candidates, piece)
|
||||
boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart = append(boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart, pendingPieceIndex(ppi))
|
||||
|
||||
// end bound on term max
|
||||
if boundsByEpoch[piece.claimTerms.claimTermEnd] == nil {
|
||||
boundsByEpoch[piece.claimTerms.claimTermEnd] = &pieceBound{
|
||||
epoch: piece.claimTerms.claimTermEnd,
|
||||
}
|
||||
}
|
||||
boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd = append(boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd, pendingPieceIndex(ppi))
|
||||
}
|
||||
|
||||
// earliest expiration first
|
||||
sort.Slice(candidates, func(i, j int) bool {
|
||||
return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch
|
||||
out := make([]pieceBound, 0, len(boundsByEpoch))
|
||||
for _, bound := range boundsByEpoch {
|
||||
out = append(out, *bound)
|
||||
}
|
||||
|
||||
sort.Slice(out, func(i, j int) bool {
|
||||
return out[i].epoch < out[j].epoch
|
||||
})
|
||||
|
||||
var totalBytes uint64
|
||||
var full bool
|
||||
|
||||
// Find the expiration of the last deal which can fit into the sector, use that as the initial target
|
||||
for _, candidate := range candidates {
|
||||
totalBytes += uint64(candidate.size)
|
||||
targetEpoch = candidate.deal.DealProposal.EndEpoch
|
||||
|
||||
if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) {
|
||||
full = true
|
||||
break
|
||||
var curBoundBytes abi.UnpaddedPieceSize
|
||||
for i, bound := range out {
|
||||
for _, ppi := range bound.boundStart {
|
||||
curBoundBytes += m.pendingPieces[cid.Cid(ppi)].size
|
||||
}
|
||||
for _, ppi := range bound.boundEnd {
|
||||
curBoundBytes -= m.pendingPieces[cid.Cid(ppi)].size
|
||||
}
|
||||
|
||||
out[i].dealBytesInBound = curBoundBytes
|
||||
}
|
||||
|
||||
ts, err := m.Api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return 0, 0, xerrors.Errorf("getting current epoch: %w", err)
|
||||
}
|
||||
|
||||
// if the sector isn't full, use max deal duration as the target
|
||||
if !full {
|
||||
minDur, maxDur := policy.DealDurationBounds(0)
|
||||
|
||||
minExpEpoch = ts.Height() + minDur
|
||||
targetEpoch = ts.Height() + maxDur
|
||||
}
|
||||
|
||||
// make sure that at least one deal in the queue is within the expiration
|
||||
if len(candidates) > 0 && candidates[0].deal.DealProposal.EndEpoch > minExpEpoch {
|
||||
minExpEpoch = candidates[0].deal.DealProposal.EndEpoch
|
||||
}
|
||||
|
||||
// apply user minimums
|
||||
if abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)+ts.Height() > minExpEpoch {
|
||||
minExpEpoch = abi.ChainEpoch(cfg.MinUpgradeSectorExpiration) + ts.Height()
|
||||
}
|
||||
if abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)+ts.Height() > targetEpoch {
|
||||
targetEpoch = abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration) + ts.Height()
|
||||
}
|
||||
|
||||
return minExpEpoch, targetEpoch, nil
|
||||
return out
|
||||
}
|
||||
|
||||
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, cfg sealiface.Config, ef expFn) (bool, error) {
|
||||
@ -584,17 +629,37 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
|
||||
return false, nil
|
||||
}
|
||||
|
||||
ts, err := m.Api.ChainHead(ctx)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
ssize, err := sp.SectorSize()
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("getting sector size: %w", err)
|
||||
return false, err
|
||||
}
|
||||
minExpirationEpoch, targetExpirationEpoch, err := m.calcTargetExpiration(ctx, ssize, cfg)
|
||||
if err != nil {
|
||||
return false, xerrors.Errorf("calculating min target expiration: %w", err)
|
||||
|
||||
pieceBounds := m.pendingPieceEpochBounds()
|
||||
|
||||
findBound := func(sectorExp abi.ChainEpoch) *pieceBound {
|
||||
if len(pieceBounds) == 0 {
|
||||
return nil
|
||||
}
|
||||
f := sort.Search(len(pieceBounds), func(i int) bool {
|
||||
return sectorExp <= pieceBounds[i].epoch
|
||||
})
|
||||
if f == 0 {
|
||||
// all piece bounds are after sector expiration
|
||||
return nil
|
||||
}
|
||||
return &pieceBounds[f-1]
|
||||
}
|
||||
|
||||
targetExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)
|
||||
minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)
|
||||
|
||||
var candidate abi.SectorID
|
||||
var bestExpiration abi.ChainEpoch
|
||||
var bestDealBytes abi.PaddedPieceSize
|
||||
bestPledge := types.TotalFilecoinInt
|
||||
|
||||
for s := range m.available {
|
||||
@ -611,19 +676,43 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
|
||||
return false
|
||||
}
|
||||
if !active {
|
||||
log.Debugw("skipping available sector", "reason", "not active")
|
||||
log.Debugw("skipping available sector", "sector", sid, "reason", "not active")
|
||||
return false
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
if expirationEpoch < minExpirationEpoch {
|
||||
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below MinUpgradeSectorExpiration")
|
||||
}
|
||||
|
||||
pb := findBound(expirationEpoch)
|
||||
if pb == nil {
|
||||
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below deal bounds")
|
||||
continue
|
||||
}
|
||||
|
||||
// if the sector has less than one sector worth of candidate deals, and
|
||||
// the best candidate has more candidate deals, this sector isn't better
|
||||
if pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) {
|
||||
if bestDealBytes > pb.dealBytesInBound.Padded() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
// if best is below target, we want larger expirations
|
||||
// if best is above target, we want lower pledge, but only if still above target
|
||||
|
||||
// todo: after nv17 "target expiration" doesn't really make that much sense
|
||||
// (tho to be fair it doesn't make too much sense now either)
|
||||
// we probably want the lowest expiration that's still above the configured
|
||||
// minimum, and has can fit most candidate deals
|
||||
|
||||
if bestExpiration < targetExpirationEpoch {
|
||||
if expirationEpoch > bestExpiration && slowChecks(s.Number) {
|
||||
bestExpiration = expirationEpoch
|
||||
bestPledge = pledge
|
||||
bestDealBytes = pb.dealBytesInBound.Padded()
|
||||
candidate = s
|
||||
}
|
||||
continue
|
||||
@ -632,6 +721,7 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP
|
||||
if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
|
||||
bestExpiration = expirationEpoch
|
||||
bestPledge = pledge
|
||||
bestDealBytes = pb.dealBytesInBound.Padded()
|
||||
candidate = s
|
||||
}
|
||||
}
|
||||
|
@ -131,22 +131,35 @@ type Sealing struct {
|
||||
}
|
||||
|
||||
type openSector struct {
|
||||
used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors
|
||||
number abi.SectorNumber
|
||||
ccUpdate bool
|
||||
used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors
|
||||
lastDealEnd abi.ChainEpoch
|
||||
number abi.SectorNumber
|
||||
ccUpdate bool
|
||||
|
||||
maybeAccept func(cid.Cid) error // called with inputLk
|
||||
}
|
||||
|
||||
func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) {
|
||||
func (o *openSector) checkDealAssignable(piece *pendingPiece, expF expFn) (bool, error) {
|
||||
// if there are deals assigned, check that no assigned deal expires after termMax
|
||||
if o.lastDealEnd > piece.claimTerms.claimTermEnd {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
// check that in case of upgrade sectors, sector expiration is at least deal expiration
|
||||
if !o.ccUpdate {
|
||||
return true, nil
|
||||
}
|
||||
expiration, _, err := expF(o.number)
|
||||
sectorExpiration, _, err := expF(o.number)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
return expiration >= dealEnd, nil
|
||||
|
||||
// check that in case of upgrade sector, it's expiration isn't above deals claim TermMax
|
||||
if sectorExpiration > piece.claimTerms.claimTermEnd {
|
||||
return false, nil
|
||||
}
|
||||
|
||||
return sectorExpiration >= piece.deal.DealProposal.EndEpoch, nil
|
||||
}
|
||||
|
||||
type pieceAcceptResp struct {
|
||||
@ -155,6 +168,11 @@ type pieceAcceptResp struct {
|
||||
err error
|
||||
}
|
||||
|
||||
type pieceClaimBounds struct {
|
||||
// dealStart + termMax
|
||||
claimTermEnd abi.ChainEpoch
|
||||
}
|
||||
|
||||
type pendingPiece struct {
|
||||
doneCh chan struct{}
|
||||
resp *pieceAcceptResp
|
||||
@ -162,6 +180,8 @@ type pendingPiece struct {
|
||||
size abi.UnpaddedPieceSize
|
||||
deal api.PieceDealInfo
|
||||
|
||||
claimTerms pieceClaimBounds
|
||||
|
||||
data storiface.Data
|
||||
|
||||
assigned bool // assigned to a sector?
|
||||
|
Loading…
Reference in New Issue
Block a user