diff --git a/api/api_full.go b/api/api_full.go index 1ec1c22be..320a20687 100644 --- a/api/api_full.go +++ b/api/api_full.go @@ -528,7 +528,8 @@ type FullNode interface { StateMarketDeals(context.Context, types.TipSetKey) (map[string]*MarketDeal, error) //perm:read // StateMarketStorageDeal returns information about the indicated deal StateMarketStorageDeal(context.Context, abi.DealID, types.TipSetKey) (*MarketDeal, error) //perm:read - // StateGetAllocationForPendingDeal returns the allocation for a given deal ID of a pending deal. + // StateGetAllocationForPendingDeal returns the allocation for a given deal ID of a pending deal. Returns nil if + // pending allocation is not found. StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error) //perm:read // StateGetAllocation returns the allocation for a given address and allocation ID. StateGetAllocation(ctx context.Context, clientAddr address.Address, allocationId verifregtypes.AllocationId, tsk types.TipSetKey) (*verifregtypes.Allocation, error) //perm:read diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index aec7b5f31..20862916a 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/chain/actors/builtin/market/state.go.template b/chain/actors/builtin/market/state.go.template index 6a36828e6..50abdeb6b 100644 --- a/chain/actors/builtin/market/state.go.template +++ b/chain/actors/builtin/market/state.go.template @@ -368,21 +368,22 @@ func (r *publishStorageDealsReturn{{.v}}) DealIDs() ([]abi.DealID, error) { func (s *state{{.v}}) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { {{if (le .v 8)}} - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") {{else}} allocations, err := adt9.AsMap(s.store, s.PendingDealAllocationIds, builtin.DefaultHamtBitwidth) if err != nil { - return 0, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) + return verifregtypes.NoAllocationID, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) } var allocationId cbg.CborInt found, err := allocations.Get(abi.UIntKey(uint64(dealId)), &allocationId) if err != nil { - return 0, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) + return verifregtypes.NoAllocationID, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) } if !found { - return 0, xerrors.Errorf("failed to find allocation id for %d", dealId) + return verifregtypes.NoAllocationID, nil } + return verifregtypes.AllocationId(allocationId), nil {{end}} } diff --git a/chain/actors/builtin/market/v0.go b/chain/actors/builtin/market/v0.go index 6fffa8697..aa68049bb 100644 --- a/chain/actors/builtin/market/v0.go +++ b/chain/actors/builtin/market/v0.go @@ -304,6 +304,6 @@ func (r *publishStorageDealsReturn0) DealIDs() ([]abi.DealID, error) { func (s *state0) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v2.go b/chain/actors/builtin/market/v2.go index be1c08c5e..777a17cd0 100644 --- a/chain/actors/builtin/market/v2.go +++ b/chain/actors/builtin/market/v2.go @@ -304,6 +304,6 @@ func (r *publishStorageDealsReturn2) DealIDs() ([]abi.DealID, error) { func (s *state2) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v3.go b/chain/actors/builtin/market/v3.go index 518a6e654..5ca09fdfb 100644 --- a/chain/actors/builtin/market/v3.go +++ b/chain/actors/builtin/market/v3.go @@ -299,6 +299,6 @@ func (r *publishStorageDealsReturn3) DealIDs() ([]abi.DealID, error) { func (s *state3) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v4.go b/chain/actors/builtin/market/v4.go index c3d50cf3b..23422ec31 100644 --- a/chain/actors/builtin/market/v4.go +++ b/chain/actors/builtin/market/v4.go @@ -299,6 +299,6 @@ func (r *publishStorageDealsReturn4) DealIDs() ([]abi.DealID, error) { func (s *state4) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v5.go b/chain/actors/builtin/market/v5.go index 23394eb2d..8e8833c37 100644 --- a/chain/actors/builtin/market/v5.go +++ b/chain/actors/builtin/market/v5.go @@ -299,6 +299,6 @@ func (r *publishStorageDealsReturn5) DealIDs() ([]abi.DealID, error) { func (s *state5) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v6.go b/chain/actors/builtin/market/v6.go index 69af9abcc..d86f73108 100644 --- a/chain/actors/builtin/market/v6.go +++ b/chain/actors/builtin/market/v6.go @@ -317,6 +317,6 @@ func (r *publishStorageDealsReturn6) DealIDs() ([]abi.DealID, error) { func (s *state6) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v7.go b/chain/actors/builtin/market/v7.go index ea6957b73..5f6547e3f 100644 --- a/chain/actors/builtin/market/v7.go +++ b/chain/actors/builtin/market/v7.go @@ -317,6 +317,6 @@ func (r *publishStorageDealsReturn7) DealIDs() ([]abi.DealID, error) { func (s *state7) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v8.go b/chain/actors/builtin/market/v8.go index c0b155fb9..5f3b690bb 100644 --- a/chain/actors/builtin/market/v8.go +++ b/chain/actors/builtin/market/v8.go @@ -334,6 +334,6 @@ func (r *publishStorageDealsReturn8) DealIDs() ([]abi.DealID, error) { func (s *state8) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes.AllocationId, error) { - return 0, xerrors.Errorf("unsupported before actors v9") + return verifregtypes.NoAllocationID, xerrors.Errorf("unsupported before actors v9") } diff --git a/chain/actors/builtin/market/v9.go b/chain/actors/builtin/market/v9.go index f781e05f0..9fed45ae8 100644 --- a/chain/actors/builtin/market/v9.go +++ b/chain/actors/builtin/market/v9.go @@ -332,17 +332,18 @@ func (s *state9) GetAllocationIdForPendingDeal(dealId abi.DealID) (verifregtypes allocations, err := adt9.AsMap(s.store, s.PendingDealAllocationIds, builtin.DefaultHamtBitwidth) if err != nil { - return 0, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) + return verifregtypes.NoAllocationID, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) } var allocationId cbg.CborInt found, err := allocations.Get(abi.UIntKey(uint64(dealId)), &allocationId) if err != nil { - return 0, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) + return verifregtypes.NoAllocationID, xerrors.Errorf("failed to load allocation id for %d: %w", dealId, err) } if !found { - return 0, xerrors.Errorf("failed to find allocation id for %d", dealId) + return verifregtypes.NoAllocationID, nil } + return verifregtypes.AllocationId(allocationId), nil } diff --git a/documentation/en/api-v1-unstable-methods.md b/documentation/en/api-v1-unstable-methods.md index 8a7bc3c29..075e844ec 100644 --- a/documentation/en/api-v1-unstable-methods.md +++ b/documentation/en/api-v1-unstable-methods.md @@ -5768,7 +5768,8 @@ Response: ``` ### StateGetAllocationForPendingDeal -StateGetAllocationForPendingDeal returns the allocation for a given deal ID of a pending deal. +StateGetAllocationForPendingDeal returns the allocation for a given deal ID of a pending deal. Returns nil if +pending allocation is not found. Perms: read diff --git a/go.mod b/go.mod index f3c073c70..b70ff949e 100644 --- a/go.mod +++ b/go.mod @@ -42,7 +42,7 @@ require ( github.com/filecoin-project/go-legs v0.4.4 github.com/filecoin-project/go-padreader v0.0.1 github.com/filecoin-project/go-paramfetch v0.0.4 - github.com/filecoin-project/go-state-types v0.9.0-rc1 + github.com/filecoin-project/go-state-types v0.9.0-rc2 github.com/filecoin-project/go-statemachine v1.0.2 github.com/filecoin-project/go-statestore v0.2.0 github.com/filecoin-project/go-storedcounter v0.1.0 diff --git a/go.sum b/go.sum index d41f75715..91d746106 100644 --- a/go.sum +++ b/go.sum @@ -343,8 +343,8 @@ github.com/filecoin-project/go-state-types v0.1.0/go.mod h1:ezYnPf0bNkTsDibL/psS github.com/filecoin-project/go-state-types v0.1.6/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.8/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= github.com/filecoin-project/go-state-types v0.1.10/go.mod h1:UwGVoMsULoCK+bWjEdd/xLCvLAQFBC7EDT477SKml+Q= -github.com/filecoin-project/go-state-types v0.9.0-rc1 h1:cK6OzKP11aIFt0fbor/lqvrzEBegbMGHnSr9LJN/r4g= -github.com/filecoin-project/go-state-types v0.9.0-rc1/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw= +github.com/filecoin-project/go-state-types v0.9.0-rc2 h1:HtSxHUEwAtpnhV/77/ugnUt9lwvTTcQ7PRV+5iTAcUw= +github.com/filecoin-project/go-state-types v0.9.0-rc2/go.mod h1:+HCZifUV+e8TlQkgll22Ucuiq8OrVJkK+4Kh4u75iiw= github.com/filecoin-project/go-statemachine v0.0.0-20200925024713-05bd7c71fbfe/go.mod h1:FGwQgZAt2Gh5mjlwJUlVB62JeYdo+if0xWxSEfBD9ig= github.com/filecoin-project/go-statemachine v1.0.2 h1:421SSWBk8GIoCoWYYTE/d+qCWccgmRH0uXotXRDjUbc= github.com/filecoin-project/go-statemachine v1.0.2/go.mod h1:jZdXXiHa61n4NmgWFG4w8tnqgvZVHYbJ3yW7+y8bF54= diff --git a/node/impl/full/state.go b/node/impl/full/state.go index 5a14447c3..e2c3438bb 100644 --- a/node/impl/full/state.go +++ b/node/impl/full/state.go @@ -786,6 +786,9 @@ func (a *StateAPI) StateGetAllocationForPendingDeal(ctx context.Context, dealId if err != nil { return nil, err } + if allocationId == verifregtypes.NoAllocationID { + return nil, nil + } dealState, err := a.StateMarketStorageDeal(ctx, dealId, tsk) if err != nil { diff --git a/storage/pipeline/input.go b/storage/pipeline/input.go index 824fee255..631e84455 100644 --- a/storage/pipeline/input.go +++ b/storage/pipeline/input.go @@ -12,6 +12,7 @@ import ( "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/lotus/api" @@ -28,8 +29,13 @@ import ( func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error { var used abi.UnpaddedPieceSize + var lastDealEnd abi.ChainEpoch for _, piece := range sector.Pieces { used += piece.Piece.Size.Unpadded() + + if piece.DealInfo != nil && piece.DealInfo.DealProposal.EndEpoch > lastDealEnd { + lastDealEnd = piece.DealInfo.DealProposal.EndEpoch + } } m.inputLk.Lock() @@ -72,6 +78,7 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e // (note that m.assignedPieces[sid] will always be empty here) m.openSectors[sid].used = used } + m.openSectors[sid].lastDealEnd = lastDealEnd go func() { defer m.inputLk.Unlock() @@ -314,6 +321,11 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec deal.DealProposal.PieceCID, ts.Height(), deal.DealProposal.StartEpoch) } + claimTerms, err := m.getClaimTerms(ctx, deal, ts.Key()) + if err != nil { + return api.SectorOffset{}, err + } + m.inputLk.Lock() if pp, exist := m.pendingPieces[proposalCID(deal)]; exist { m.inputLk.Unlock() @@ -332,7 +344,7 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec } // addPendingPiece takes over m.inputLk - pp := m.addPendingPiece(ctx, size, data, deal, sp) + pp := m.addPendingPiece(ctx, size, data, deal, claimTerms, sp) res, err := waitAddPieceResp(ctx, pp) if err != nil { @@ -341,14 +353,41 @@ func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPiec return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err } +func (m *Sealing) getClaimTerms(ctx context.Context, deal api.PieceDealInfo, tsk types.TipSetKey) (pieceClaimBounds, error) { + nv, err := m.Api.StateNetworkVersion(ctx, tsk) + if err != nil { + return pieceClaimBounds{}, err + } + + if nv >= network.Version17 { + all, err := m.Api.StateGetAllocationForPendingDeal(ctx, deal.DealID, tsk) + if err != nil { + return pieceClaimBounds{}, err + } + if all != nil { + return pieceClaimBounds{ + claimTermEnd: deal.DealProposal.StartEpoch + all.TermMax, + }, nil + } + } + + // no allocation for this deal, so just use a really high number for "term end" + return pieceClaimBounds{ + claimTermEnd: deal.DealProposal.EndEpoch + policy.GetSectorMaxLifetime(abi.RegisteredSealProof_StackedDrg32GiBV1_1, network.Version17), + }, nil +} + // called with m.inputLk; transfers the lock to another goroutine! -func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo, sp abi.RegisteredSealProof) *pendingPiece { +func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo, ct pieceClaimBounds, sp abi.RegisteredSealProof) *pendingPiece { doneCh := make(chan struct{}) pp := &pendingPiece{ + size: size, + deal: deal, + claimTerms: ct, + + data: data, + doneCh: doneCh, - size: size, - deal: deal, - data: data, assigned: false, } pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) { @@ -420,6 +459,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e sector abi.SectorID deal cid.Cid + dealEnd abi.ChainEpoch + claimTermEnd abi.ChainEpoch + size abi.UnpaddedPieceSize padding abi.UnpaddedPieceSize } @@ -440,13 +482,15 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used // check that sector lifetime is long enough to fit deal using latest expiration from on chain - ok, err := sector.dealFitsInLifetime(piece.deal.DealProposal.EndEpoch, getExpirationCached) + ok, err := sector.checkDealAssignable(piece, getExpirationCached) if err != nil { log.Errorf("failed to check expiration for cc Update sector %d", sector.number) continue } if !ok { exp, _, _ := getExpirationCached(sector.number) + + // todo move this log into checkDealAssignable, make more detailed about the reason log.Debugf("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch) continue } @@ -456,6 +500,9 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e sector: id, deal: proposalCid, + dealEnd: piece.deal.DealProposal.EndEpoch, + claimTermEnd: piece.claimTerms.claimTermEnd, + size: piece.size, padding: avail % piece.size, }) @@ -463,6 +510,8 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e } } sort.Slice(matches, func(i, j int) bool { + // todo maybe sort by expiration + if matches[i].padding != matches[j].padding { // less padding is better return matches[i].padding < matches[j].padding } @@ -487,18 +536,29 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e continue } + // late checks + avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used if mt.size > avail { continue } + if m.openSectors[mt.sector].lastDealEnd > mt.claimTermEnd { + continue + } + + // assign the piece! + err := m.openSectors[mt.sector].maybeAccept(mt.deal) if err != nil { m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece } m.openSectors[mt.sector].used += mt.padding + mt.size + if mt.dealEnd > m.openSectors[mt.sector].lastDealEnd { + m.openSectors[mt.sector].lastDealEnd = mt.dealEnd + } m.pendingPieces[mt.deal].assigned = true delete(toAssign, mt.deal) @@ -521,62 +581,66 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e return nil } -func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize, cfg sealiface.Config) (minExpEpoch, targetEpoch abi.ChainEpoch, err error) { - var candidates []*pendingPiece +// pendingPieceIndex is an index in the Sealing.pendingPieces map +type pendingPieceIndex cid.Cid - for _, piece := range m.pendingPieces { - if piece.assigned { - continue // already assigned to a sector, skip +type pieceBound struct { + epoch abi.ChainEpoch + + // boundStart marks deal /end/ epoch; only deals with boundStart lower or equal to expiration of a given sector can be + // put into that sector + boundStart []pendingPieceIndex + + // boundEnd marks deal claim TermMax; only deals with boundEnd higher or equal to expiration of a given sector can be + // put into that sector + boundEnd []pendingPieceIndex + + dealBytesInBound abi.UnpaddedPieceSize +} + +func (m *Sealing) pendingPieceEpochBounds() []pieceBound { + boundsByEpoch := map[abi.ChainEpoch]*pieceBound{} + + for ppi, piece := range m.pendingPieces { + // start bound on deal end + if boundsByEpoch[piece.deal.DealProposal.EndEpoch] == nil { + boundsByEpoch[piece.deal.DealProposal.EndEpoch] = &pieceBound{ + epoch: piece.deal.DealProposal.EndEpoch, + } } - candidates = append(candidates, piece) + boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart = append(boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart, pendingPieceIndex(ppi)) + + // end bound on term max + if boundsByEpoch[piece.claimTerms.claimTermEnd] == nil { + boundsByEpoch[piece.claimTerms.claimTermEnd] = &pieceBound{ + epoch: piece.claimTerms.claimTermEnd, + } + } + boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd = append(boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd, pendingPieceIndex(ppi)) } - // earliest expiration first - sort.Slice(candidates, func(i, j int) bool { - return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch + out := make([]pieceBound, 0, len(boundsByEpoch)) + for _, bound := range boundsByEpoch { + out = append(out, *bound) + } + + sort.Slice(out, func(i, j int) bool { + return out[i].epoch < out[j].epoch }) - var totalBytes uint64 - var full bool - - // Find the expiration of the last deal which can fit into the sector, use that as the initial target - for _, candidate := range candidates { - totalBytes += uint64(candidate.size) - targetEpoch = candidate.deal.DealProposal.EndEpoch - - if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) { - full = true - break + var curBoundBytes abi.UnpaddedPieceSize + for i, bound := range out { + for _, ppi := range bound.boundStart { + curBoundBytes += m.pendingPieces[cid.Cid(ppi)].size } + for _, ppi := range bound.boundEnd { + curBoundBytes -= m.pendingPieces[cid.Cid(ppi)].size + } + + out[i].dealBytesInBound = curBoundBytes } - ts, err := m.Api.ChainHead(ctx) - if err != nil { - return 0, 0, xerrors.Errorf("getting current epoch: %w", err) - } - - // if the sector isn't full, use max deal duration as the target - if !full { - minDur, maxDur := policy.DealDurationBounds(0) - - minExpEpoch = ts.Height() + minDur - targetEpoch = ts.Height() + maxDur - } - - // make sure that at least one deal in the queue is within the expiration - if len(candidates) > 0 && candidates[0].deal.DealProposal.EndEpoch > minExpEpoch { - minExpEpoch = candidates[0].deal.DealProposal.EndEpoch - } - - // apply user minimums - if abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)+ts.Height() > minExpEpoch { - minExpEpoch = abi.ChainEpoch(cfg.MinUpgradeSectorExpiration) + ts.Height() - } - if abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)+ts.Height() > targetEpoch { - targetEpoch = abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration) + ts.Height() - } - - return minExpEpoch, targetEpoch, nil + return out } func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, cfg sealiface.Config, ef expFn) (bool, error) { @@ -584,17 +648,37 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP return false, nil } + ts, err := m.Api.ChainHead(ctx) + if err != nil { + return false, err + } ssize, err := sp.SectorSize() if err != nil { - return false, xerrors.Errorf("getting sector size: %w", err) + return false, err } - minExpirationEpoch, targetExpirationEpoch, err := m.calcTargetExpiration(ctx, ssize, cfg) - if err != nil { - return false, xerrors.Errorf("calculating min target expiration: %w", err) + + pieceBounds := m.pendingPieceEpochBounds() + + findBound := func(sectorExp abi.ChainEpoch) *pieceBound { + if len(pieceBounds) == 0 { + return nil + } + f := sort.Search(len(pieceBounds), func(i int) bool { + return sectorExp <= pieceBounds[i].epoch + }) + if f == 0 { + // all piece bounds are after sector expiration + return nil + } + return &pieceBounds[f-1] } + targetExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration) + minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration) + var candidate abi.SectorID var bestExpiration abi.ChainEpoch + var bestDealBytes abi.PaddedPieceSize bestPledge := types.TotalFilecoinInt for s := range m.available { @@ -611,19 +695,43 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP return false } if !active { - log.Debugw("skipping available sector", "reason", "not active") + log.Debugw("skipping available sector", "sector", sid, "reason", "not active") return false } return true } + if expirationEpoch < minExpirationEpoch { + log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below MinUpgradeSectorExpiration") + } + + pb := findBound(expirationEpoch) + if pb == nil { + log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below deal bounds") + continue + } + + // if the sector has less than one sector worth of candidate deals, and + // the best candidate has more candidate deals, this sector isn't better + if pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) { + if bestDealBytes > pb.dealBytesInBound.Padded() { + continue + } + } + // if best is below target, we want larger expirations // if best is above target, we want lower pledge, but only if still above target + // todo: after nv17 "target expiration" doesn't really make that much sense + // (tho to be fair it doesn't make too much sense now either) + // we probably want the lowest expiration that's still above the configured + // minimum, and can fit most candidate deals + if bestExpiration < targetExpirationEpoch { if expirationEpoch > bestExpiration && slowChecks(s.Number) { bestExpiration = expirationEpoch bestPledge = pledge + bestDealBytes = pb.dealBytesInBound.Padded() candidate = s } continue @@ -632,6 +740,7 @@ func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealP if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) { bestExpiration = expirationEpoch bestPledge = pledge + bestDealBytes = pb.dealBytesInBound.Padded() candidate = s } } diff --git a/storage/pipeline/mocks/api.go b/storage/pipeline/mocks/api.go index b6b6c7022..066fe996e 100644 --- a/storage/pipeline/mocks/api.go +++ b/storage/pipeline/mocks/api.go @@ -16,6 +16,7 @@ import ( abi "github.com/filecoin-project/go-state-types/abi" big "github.com/filecoin-project/go-state-types/big" miner "github.com/filecoin-project/go-state-types/builtin/v9/miner" + verifreg "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" crypto "github.com/filecoin-project/go-state-types/crypto" dline "github.com/filecoin-project/go-state-types/dline" network "github.com/filecoin-project/go-state-types/network" @@ -138,6 +139,21 @@ func (mr *MockSealingAPIMockRecorder) StateComputeDataCID(arg0, arg1, arg2, arg3 return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateComputeDataCID", reflect.TypeOf((*MockSealingAPI)(nil).StateComputeDataCID), arg0, arg1, arg2, arg3, arg4) } +// StateGetAllocationForPendingDeal mocks base method. +func (m *MockSealingAPI) StateGetAllocationForPendingDeal(arg0 context.Context, arg1 abi.DealID, arg2 types.TipSetKey) (*verifreg.Allocation, error) { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "StateGetAllocationForPendingDeal", arg0, arg1, arg2) + ret0, _ := ret[0].(*verifreg.Allocation) + ret1, _ := ret[1].(error) + return ret0, ret1 +} + +// StateGetAllocationForPendingDeal indicates an expected call of StateGetAllocationForPendingDeal. +func (mr *MockSealingAPIMockRecorder) StateGetAllocationForPendingDeal(arg0, arg1, arg2 interface{}) *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "StateGetAllocationForPendingDeal", reflect.TypeOf((*MockSealingAPI)(nil).StateGetAllocationForPendingDeal), arg0, arg1, arg2) +} + // StateGetRandomnessFromBeacon mocks base method. func (m *MockSealingAPI) StateGetRandomnessFromBeacon(arg0 context.Context, arg1 crypto.DomainSeparationTag, arg2 abi.ChainEpoch, arg3 []byte, arg4 types.TipSetKey) (abi.Randomness, error) { m.ctrl.T.Helper() diff --git a/storage/pipeline/sealing.go b/storage/pipeline/sealing.go index 153a4cc32..6caa9ddc6 100644 --- a/storage/pipeline/sealing.go +++ b/storage/pipeline/sealing.go @@ -16,6 +16,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/builtin/v9/miner" + verifregtypes "github.com/filecoin-project/go-state-types/builtin/v9/verifreg" "github.com/filecoin-project/go-state-types/crypto" "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/network" @@ -69,6 +70,7 @@ type SealingAPI interface { StateGetRandomnessFromTickets(ctx context.Context, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte, tsk types.TipSetKey) (abi.Randomness, error) ChainReadObj(context.Context, cid.Cid) ([]byte, error) StateMinerAllocated(context.Context, address.Address, types.TipSetKey) (*bitfield.BitField, error) + StateGetAllocationForPendingDeal(ctx context.Context, dealId abi.DealID, tsk types.TipSetKey) (*verifregtypes.Allocation, error) // Address selector WalletBalance(context.Context, address.Address) (types.BigInt, error) @@ -131,22 +133,35 @@ type Sealing struct { } type openSector struct { - used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors - number abi.SectorNumber - ccUpdate bool + used abi.UnpaddedPieceSize // change to bitfield/rle when AddPiece gains offset support to better fill sectors + lastDealEnd abi.ChainEpoch + number abi.SectorNumber + ccUpdate bool maybeAccept func(cid.Cid) error // called with inputLk } -func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) { +func (o *openSector) checkDealAssignable(piece *pendingPiece, expF expFn) (bool, error) { + // if there are deals assigned, check that no assigned deal expires after termMax + if o.lastDealEnd > piece.claimTerms.claimTermEnd { + return false, nil + } + + // check that in case of upgrade sectors, sector expiration is at least deal expiration if !o.ccUpdate { return true, nil } - expiration, _, err := expF(o.number) + sectorExpiration, _, err := expF(o.number) if err != nil { return false, err } - return expiration >= dealEnd, nil + + // check that in case of upgrade sector, it's expiration isn't above deals claim TermMax + if sectorExpiration > piece.claimTerms.claimTermEnd { + return false, nil + } + + return sectorExpiration >= piece.deal.DealProposal.EndEpoch, nil } type pieceAcceptResp struct { @@ -155,6 +170,11 @@ type pieceAcceptResp struct { err error } +type pieceClaimBounds struct { + // dealStart + termMax + claimTermEnd abi.ChainEpoch +} + type pendingPiece struct { doneCh chan struct{} resp *pieceAcceptResp @@ -162,6 +182,8 @@ type pendingPiece struct { size abi.UnpaddedPieceSize deal api.PieceDealInfo + claimTerms pieceClaimBounds + data storiface.Data assigned bool // assigned to a sector?