bd10bdf99a
* build: Bump version to v1.17.3-dev * build: set version to v1.18.0-dev * chore: actors: Allow builtin-actors to return a map of methods (#9342) * Allow builtin-actors to return a map of methods * go mod * Fix tests * Fix tests, check carefully please * Delete lotus-pond (#9352) * feat: add StateNetworkVersion to mpool API * chore: refactor: rename NewestNetworkVersion * feat: actors: Integrate datacap actor into lotus (#9348) * Integrate datacap actor * Implement datacap actor in chain/builtin * feat: support typed errors over RPC * chore: deps: update to go-jsonrpc 0.1.8 * remove duplicate import * fix: itest: check for closed connection * chore: refactor: move retry test to API * address magik supernit * Add ability to only have single partition per msg for partitions with recovery sectors * doc gen * Address comments * Return beneficiary info from miner state Info() * Update builtin-actors to dev/20220922-v9 which includes FIP-0045 changes in progress * Integrate verifreg changes to lotus * Setup datacap actor * Update builtin-actors to dev/20220922-v9-1 * Update datacap actor to query datacap instead of verifreg * update gst * update markets * update actors with hamt fix * update gst * Update datacap to parse tokens * Update bundles * datacap and verifreg actors use ID addresses without protocol byte * update builtin-actors to rc1 * update go-fil-markets * Update bundles to rc2 * Integrate the v9 migration * Add api for getting allocation * Add upgrade epoch for butterfly * Tweak PreSeal struct to be infra-friendly * docsgen * More tweaking of PreSeal for genesis * review fixes * Use fake cid for test * add butterfly artifacts for oct 5 upgrade * check datacaps for v8 verifreg match v9 datacap actor * Remove print statements * Update to go-state-types master * Update to go-state-types v0.9.0-rc1 * review fixes * use go-fil-markets v1.24.0-v17 * Add accessors for allocations and claims maps * fix: missing permissions tag * butterfly * update butterfly artifacts * sealing pipeline: Prepare deal assigning logic for FIP-45 * sealing pipeline: Get allocationId with StateApi * use NoAllocationID instead of nil AllocationId * address review * Add datacap actor to registry.go * Add cli for listing allocations and removing expired allocations * Update to go-state-types master * deps: upgrade go-merkledag to 0.8.0 * shark params * Update cli/filplus.go Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * revert change to verifreg util * docsgen-cli * miss the stuff * Update FFI * Update go-state-types to v0.9.0 * Update builtin-actors to v9.0.0 * add calib upgrade epcoh * update the upgrade envvar * kill shark * Remove fvm splash banner from nv17 upgrade * check invariance for pending deals and allocations * check pending verified deal proposal migrated to allocation * Add check for unsealed CID in precommit sectors * Fix counting of allocations in nv17 migration test * make gen * pass state trees as pointers * Add assertion that migrations with & without cache are the same * compare allocation to verified deal proposal * Fix miner state precommit info * fix migration test tool * add changelog * Update to go-state-types v0.9.1 * Integrate builtin-actors v9.0.1 * chore: ver: bump version for rc3 (#9512) * Bump version to 1.18.0-rc3 * Update CHANGELOG.md * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Update CHANGELOG.md Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Co-authored-by: Aayush Rajasekaran <arajasek94@gmail.com> * Migration: Use autobatch bs * Fix autobatch Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> * Invoker: Use MethodMeta from go-state-types * Add a second premigration for nv17 * Add more shed tools for migration checking * address review * Lotus release v1.18.0-rc4 * fix: ci: fix app-image build on ci (#9527) * Remove old go version first * Add GO_VERSION file * Use GO_VERSION to set / verify go version * mv GO_VERSION GO_VERSION_MIN * Use GO_VERSION_MIN in Makefile check Co-authored-by: Ian Davis <jungziege@gmail.com> * Update to latest go-state-types for migration fixes * go mod tidy * fix: use api.ErrActorNotFound instead of types.ErrActorNotFound * fix: add fields to ForkUpgradeParams * docs: update actors_version_checklist.md * chore: fix lint * update to go state type v0.9.6 with market migration fix (#9545) * update go-state-types to v-0.9.7 * Add invariant checks to migration * fix invariant check: number of entries in datacap actor should include verifreg * Invariant checks: Only include not-activated deals * test: nv17 migration * Address review * add lotus-shed invariance method * Migration cli takes a stateroot cid and a height * make gen * Update to builtin-actors v9.0.2 * Failing test that shows that notaries can remove datacap from the verifreg actor * Test that should pass when the problem is solved * make gen * Review fixes * statemanager call function will return call information even if call errors * update go-state-types * update builtin-actors * bubble up errors properly from ApplyImplicitMessage * bump to rc5 * set new upgrade heights for calibnet * set new upgrade height for butterfly * tweak calibnet upgrade schedule * clarify changelog note about calibnet * butterfly * update calibnet artifacts * Allow setting local bundles for Debug FVM for av 9+ * fix: autobatch: remove potential deadlock when a block is missing Check the _underlying_ blockstore instead of recursing. Also, drop the lock before we do that. * fix imports * build: set shark mainnet epoch (#9640) * chore: build: Lotus release v1.18.0 (#9641) * Lotus release v1.18.0 * add changelog * address review * changelog improvement Co-authored-by: Jennifer Wang <jiayingw703@gmail.com> Co-authored-by: Jiaying Wang <42981373+jennijuju@users.noreply.github.com> Signed-off-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Łukasz Magiera <magik6k@gmail.com> Co-authored-by: Łukasz Magiera <magik6k@users.noreply.github.com> Co-authored-by: Aayush <arajasek94@gmail.com> Co-authored-by: Geoff Stuart <geoff.vball@gmail.com> Co-authored-by: Shrenuj Bansal <shrenuj.bansal@protocol.ai> Co-authored-by: simlecode <69969590+simlecode@users.noreply.github.com> Co-authored-by: Rod Vagg <rod@vagg.org> Co-authored-by: Jakub Sztandera <kubuxu@protocol.ai> Co-authored-by: Ian Davis <jungziege@gmail.com> Co-authored-by: zenground0 <ZenGround0@users.noreply.github.com> Co-authored-by: Steven Allen <steven@stebalien.com>
947 lines
28 KiB
Go
947 lines
28 KiB
Go
package sealing
|
|
|
|
import (
|
|
"context"
|
|
"sort"
|
|
"time"
|
|
|
|
"github.com/ipfs/go-cid"
|
|
"golang.org/x/xerrors"
|
|
|
|
"github.com/filecoin-project/go-commp-utils/zerocomm"
|
|
"github.com/filecoin-project/go-padreader"
|
|
"github.com/filecoin-project/go-state-types/abi"
|
|
"github.com/filecoin-project/go-state-types/big"
|
|
"github.com/filecoin-project/go-state-types/network"
|
|
"github.com/filecoin-project/go-statemachine"
|
|
|
|
"github.com/filecoin-project/lotus/api"
|
|
"github.com/filecoin-project/lotus/build"
|
|
"github.com/filecoin-project/lotus/chain/actors/policy"
|
|
"github.com/filecoin-project/lotus/chain/types"
|
|
"github.com/filecoin-project/lotus/storage/pipeline/lib/nullreader"
|
|
"github.com/filecoin-project/lotus/storage/pipeline/sealiface"
|
|
"github.com/filecoin-project/lotus/storage/sealer"
|
|
"github.com/filecoin-project/lotus/storage/sealer/ffiwrapper"
|
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
|
"github.com/filecoin-project/lotus/storage/sectorblocks"
|
|
)
|
|
|
|
func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) error {
|
|
var used abi.UnpaddedPieceSize
|
|
var lastDealEnd abi.ChainEpoch
|
|
for _, piece := range sector.Pieces {
|
|
used += piece.Piece.Size.Unpadded()
|
|
|
|
if piece.DealInfo != nil && piece.DealInfo.DealProposal.EndEpoch > lastDealEnd {
|
|
lastDealEnd = piece.DealInfo.DealProposal.EndEpoch
|
|
}
|
|
}
|
|
|
|
m.inputLk.Lock()
|
|
|
|
if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber {
|
|
m.nextDealSector = nil
|
|
}
|
|
|
|
sid := m.minerSectorID(sector.SectorNumber)
|
|
|
|
if len(m.assignedPieces[sid]) > 0 {
|
|
m.inputLk.Unlock()
|
|
// got assigned more pieces in the AddPiece state
|
|
return ctx.Send(SectorAddPiece{})
|
|
}
|
|
|
|
started, err := m.maybeStartSealing(ctx, sector, used)
|
|
if err != nil || started {
|
|
delete(m.openSectors, m.minerSectorID(sector.SectorNumber))
|
|
|
|
m.inputLk.Unlock()
|
|
|
|
return err
|
|
}
|
|
|
|
if _, has := m.openSectors[sid]; !has {
|
|
m.openSectors[sid] = &openSector{
|
|
used: used,
|
|
maybeAccept: func(cid cid.Cid) error {
|
|
// todo check deal start deadline (configurable)
|
|
m.assignedPieces[sid] = append(m.assignedPieces[sid], cid)
|
|
|
|
return ctx.Send(SectorAddPiece{})
|
|
},
|
|
number: sector.SectorNumber,
|
|
ccUpdate: sector.CCUpdate,
|
|
}
|
|
} else {
|
|
// make sure we're only accounting for pieces which were correctly added
|
|
// (note that m.assignedPieces[sid] will always be empty here)
|
|
m.openSectors[sid].used = used
|
|
}
|
|
m.openSectors[sid].lastDealEnd = lastDealEnd
|
|
|
|
go func() {
|
|
defer m.inputLk.Unlock()
|
|
if err := m.updateInput(ctx.Context(), sector.SectorType); err != nil {
|
|
log.Errorf("%+v", err)
|
|
}
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (m *Sealing) maybeStartSealing(ctx statemachine.Context, sector SectorInfo, used abi.UnpaddedPieceSize) (bool, error) {
|
|
now := time.Now()
|
|
st := m.sectorTimers[m.minerSectorID(sector.SectorNumber)]
|
|
if st != nil {
|
|
if !st.Stop() { // timer expired, SectorStartPacking was/is being sent
|
|
// we send another SectorStartPacking in case one was sent in the handleAddPiece state
|
|
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
|
|
return true, ctx.Send(SectorStartPacking{})
|
|
}
|
|
}
|
|
|
|
ssize, err := sector.SectorType.SectorSize()
|
|
if err != nil {
|
|
return false, xerrors.Errorf("getting sector size")
|
|
}
|
|
|
|
maxDeals, err := getDealPerSectorLimit(ssize)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("getting per-sector deal limit: %w", err)
|
|
}
|
|
|
|
if len(sector.dealIDs()) >= maxDeals {
|
|
// can't accept more deals
|
|
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "maxdeals")
|
|
return true, ctx.Send(SectorStartPacking{})
|
|
}
|
|
|
|
if used.Padded() == abi.PaddedPieceSize(ssize) {
|
|
// sector full
|
|
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "filled")
|
|
return true, ctx.Send(SectorStartPacking{})
|
|
}
|
|
|
|
if sector.CreationTime != 0 {
|
|
cfg, err := m.getConfig()
|
|
if err != nil {
|
|
return false, xerrors.Errorf("getting storage config: %w", err)
|
|
}
|
|
|
|
sealTime := time.Unix(sector.CreationTime, 0).Add(cfg.WaitDealsDelay)
|
|
|
|
// check deal age, start sealing when the deal closest to starting is within slack time
|
|
ts, err := m.Api.ChainHead(ctx.Context())
|
|
blockTime := time.Second * time.Duration(build.BlockDelaySecs)
|
|
if err != nil {
|
|
return false, xerrors.Errorf("API error getting head: %w", err)
|
|
}
|
|
for _, piece := range sector.Pieces {
|
|
if piece.DealInfo == nil {
|
|
continue
|
|
}
|
|
dealSafeSealEpoch := piece.DealInfo.DealProposal.StartEpoch - cfg.StartEpochSealingBuffer
|
|
dealSafeSealTime := time.Now().Add(time.Duration(dealSafeSealEpoch-ts.Height()) * blockTime)
|
|
if dealSafeSealTime.Before(sealTime) {
|
|
sealTime = dealSafeSealTime
|
|
}
|
|
}
|
|
|
|
if now.After(sealTime) {
|
|
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timeout")
|
|
return true, ctx.Send(SectorStartPacking{})
|
|
}
|
|
|
|
m.sectorTimers[m.minerSectorID(sector.SectorNumber)] = time.AfterFunc(sealTime.Sub(now), func() {
|
|
log.Infow("starting to seal deal sector", "sector", sector.SectorNumber, "trigger", "wait-timer")
|
|
|
|
if err := ctx.Send(SectorStartPacking{}); err != nil {
|
|
log.Errorw("sending SectorStartPacking event failed", "sector", sector.SectorNumber, "error", err)
|
|
}
|
|
})
|
|
}
|
|
|
|
return false, nil
|
|
}
|
|
|
|
func (m *Sealing) handleAddPiece(ctx statemachine.Context, sector SectorInfo) error {
|
|
ssize, err := sector.SectorType.SectorSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
res := SectorPieceAdded{}
|
|
|
|
m.inputLk.Lock()
|
|
|
|
pending, ok := m.assignedPieces[m.minerSectorID(sector.SectorNumber)]
|
|
if ok {
|
|
delete(m.assignedPieces, m.minerSectorID(sector.SectorNumber))
|
|
}
|
|
m.inputLk.Unlock()
|
|
if !ok {
|
|
// nothing to do here (might happen after a restart in AddPiece)
|
|
return ctx.Send(res)
|
|
}
|
|
|
|
var offset abi.UnpaddedPieceSize
|
|
pieceSizes := make([]abi.UnpaddedPieceSize, len(sector.Pieces))
|
|
for i, p := range sector.Pieces {
|
|
pieceSizes[i] = p.Piece.Size.Unpadded()
|
|
offset += p.Piece.Size.Unpadded()
|
|
}
|
|
|
|
maxDeals, err := getDealPerSectorLimit(ssize)
|
|
if err != nil {
|
|
return xerrors.Errorf("getting per-sector deal limit: %w", err)
|
|
}
|
|
|
|
for i, piece := range pending {
|
|
m.inputLk.Lock()
|
|
deal, ok := m.pendingPieces[piece]
|
|
m.inputLk.Unlock()
|
|
if !ok {
|
|
return xerrors.Errorf("piece %s assigned to sector %d not found", piece, sector.SectorNumber)
|
|
}
|
|
|
|
if len(sector.dealIDs())+(i+1) > maxDeals {
|
|
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
|
|
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("too many deals assigned to sector %d, dropping deal", sector.SectorNumber))
|
|
continue
|
|
}
|
|
|
|
pads, padLength := ffiwrapper.GetRequiredPadding(offset.Padded(), deal.size.Padded())
|
|
|
|
if offset.Padded()+padLength+deal.size.Padded() > abi.PaddedPieceSize(ssize) {
|
|
// todo: this is rather unlikely to happen, but in case it does, return the deal to waiting queue instead of failing it
|
|
deal.accepted(sector.SectorNumber, offset, xerrors.Errorf("piece %s assigned to sector %d with not enough space", piece, sector.SectorNumber))
|
|
continue
|
|
}
|
|
|
|
offset += padLength.Unpadded()
|
|
|
|
for _, p := range pads {
|
|
expectCid := zerocomm.ZeroPieceCommitment(p.Unpadded())
|
|
|
|
ppi, err := m.sealer.AddPiece(sealer.WithPriority(ctx.Context(), DealSectorPriority),
|
|
m.minerSector(sector.SectorType, sector.SectorNumber),
|
|
pieceSizes,
|
|
p.Unpadded(),
|
|
nullreader.NewNullReader(p.Unpadded()))
|
|
if err != nil {
|
|
err = xerrors.Errorf("writing padding piece: %w", err)
|
|
deal.accepted(sector.SectorNumber, offset, err)
|
|
return ctx.Send(SectorAddPieceFailed{err})
|
|
}
|
|
if !ppi.PieceCID.Equals(expectCid) {
|
|
err = xerrors.Errorf("got unexpected padding piece CID: expected:%s, got:%s", expectCid, ppi.PieceCID)
|
|
deal.accepted(sector.SectorNumber, offset, err)
|
|
return ctx.Send(SectorAddPieceFailed{err})
|
|
}
|
|
|
|
pieceSizes = append(pieceSizes, p.Unpadded())
|
|
res.NewPieces = append(res.NewPieces, api.SectorPiece{
|
|
Piece: ppi,
|
|
})
|
|
}
|
|
|
|
ppi, err := m.sealer.AddPiece(sealer.WithPriority(ctx.Context(), DealSectorPriority),
|
|
m.minerSector(sector.SectorType, sector.SectorNumber),
|
|
pieceSizes,
|
|
deal.size,
|
|
deal.data)
|
|
if err != nil {
|
|
err = xerrors.Errorf("writing piece: %w", err)
|
|
deal.accepted(sector.SectorNumber, offset, err)
|
|
return ctx.Send(SectorAddPieceFailed{err})
|
|
}
|
|
if !ppi.PieceCID.Equals(deal.deal.DealProposal.PieceCID) {
|
|
err = xerrors.Errorf("got unexpected piece CID: expected:%s, got:%s", deal.deal.DealProposal.PieceCID, ppi.PieceCID)
|
|
deal.accepted(sector.SectorNumber, offset, err)
|
|
return ctx.Send(SectorAddPieceFailed{err})
|
|
}
|
|
|
|
log.Infow("deal added to a sector", "deal", deal.deal.DealID, "sector", sector.SectorNumber, "piece", ppi.PieceCID)
|
|
|
|
deal.accepted(sector.SectorNumber, offset, nil)
|
|
|
|
offset += deal.size
|
|
pieceSizes = append(pieceSizes, deal.size)
|
|
|
|
res.NewPieces = append(res.NewPieces, api.SectorPiece{
|
|
Piece: ppi,
|
|
DealInfo: &deal.deal,
|
|
})
|
|
}
|
|
|
|
return ctx.Send(res)
|
|
}
|
|
|
|
func (m *Sealing) handleAddPieceFailed(ctx statemachine.Context, sector SectorInfo) error {
|
|
return ctx.Send(SectorRetryWaitDeals{})
|
|
}
|
|
|
|
func (m *Sealing) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo) (api.SectorOffset, error) {
|
|
log.Infof("Adding piece for deal %d (publish msg: %s)", deal.DealID, deal.PublishCid)
|
|
if (padreader.PaddedSize(uint64(size))) != size {
|
|
return api.SectorOffset{}, xerrors.Errorf("cannot allocate unpadded piece")
|
|
}
|
|
|
|
sp, err := m.currentSealProof(ctx)
|
|
if err != nil {
|
|
return api.SectorOffset{}, xerrors.Errorf("getting current seal proof type: %w", err)
|
|
}
|
|
|
|
ssize, err := sp.SectorSize()
|
|
if err != nil {
|
|
return api.SectorOffset{}, err
|
|
}
|
|
|
|
if size > abi.PaddedPieceSize(ssize).Unpadded() {
|
|
return api.SectorOffset{}, xerrors.Errorf("piece cannot fit into a sector")
|
|
}
|
|
|
|
if _, err := deal.DealProposal.Cid(); err != nil {
|
|
return api.SectorOffset{}, xerrors.Errorf("getting proposal CID: %w", err)
|
|
}
|
|
|
|
cfg, err := m.getConfig()
|
|
if err != nil {
|
|
return api.SectorOffset{}, xerrors.Errorf("getting config: %w", err)
|
|
}
|
|
|
|
ts, err := m.Api.ChainHead(ctx)
|
|
if err != nil {
|
|
return api.SectorOffset{}, xerrors.Errorf("couldnt get chain head: %w", err)
|
|
}
|
|
if ts.Height()+cfg.StartEpochSealingBuffer > deal.DealProposal.StartEpoch {
|
|
return api.SectorOffset{}, xerrors.Errorf(
|
|
"cannot add piece for deal with piece CID %s: current epoch %d has passed deal proposal start epoch %d",
|
|
deal.DealProposal.PieceCID, ts.Height(), deal.DealProposal.StartEpoch)
|
|
}
|
|
|
|
claimTerms, err := m.getClaimTerms(ctx, deal, ts.Key())
|
|
if err != nil {
|
|
return api.SectorOffset{}, err
|
|
}
|
|
|
|
m.inputLk.Lock()
|
|
if pp, exist := m.pendingPieces[proposalCID(deal)]; exist {
|
|
m.inputLk.Unlock()
|
|
|
|
// we already have a pre-existing add piece call for this deal, let's wait for it to finish and see if it's successful
|
|
res, err := waitAddPieceResp(ctx, pp)
|
|
if err != nil {
|
|
return api.SectorOffset{}, err
|
|
}
|
|
if res.err == nil {
|
|
// all good, return the response
|
|
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
|
|
}
|
|
// if there was an error waiting for a pre-existing add piece call, let's retry
|
|
m.inputLk.Lock()
|
|
}
|
|
|
|
// addPendingPiece takes over m.inputLk
|
|
pp := m.addPendingPiece(ctx, size, data, deal, claimTerms, sp)
|
|
|
|
res, err := waitAddPieceResp(ctx, pp)
|
|
if err != nil {
|
|
return api.SectorOffset{}, err
|
|
}
|
|
return api.SectorOffset{Sector: res.sn, Offset: res.offset.Padded()}, res.err
|
|
}
|
|
|
|
func (m *Sealing) getClaimTerms(ctx context.Context, deal api.PieceDealInfo, tsk types.TipSetKey) (pieceClaimBounds, error) {
|
|
nv, err := m.Api.StateNetworkVersion(ctx, tsk)
|
|
if err != nil {
|
|
return pieceClaimBounds{}, err
|
|
}
|
|
|
|
if nv >= network.Version17 {
|
|
all, err := m.Api.StateGetAllocationForPendingDeal(ctx, deal.DealID, tsk)
|
|
if err != nil {
|
|
return pieceClaimBounds{}, err
|
|
}
|
|
if all != nil {
|
|
return pieceClaimBounds{
|
|
claimTermEnd: deal.DealProposal.StartEpoch + all.TermMax,
|
|
}, nil
|
|
}
|
|
}
|
|
|
|
// no allocation for this deal, so just use a really high number for "term end"
|
|
return pieceClaimBounds{
|
|
claimTermEnd: deal.DealProposal.EndEpoch + policy.GetSectorMaxLifetime(abi.RegisteredSealProof_StackedDrg32GiBV1_1, network.Version17),
|
|
}, nil
|
|
}
|
|
|
|
// called with m.inputLk; transfers the lock to another goroutine!
|
|
func (m *Sealing) addPendingPiece(ctx context.Context, size abi.UnpaddedPieceSize, data storiface.Data, deal api.PieceDealInfo, ct pieceClaimBounds, sp abi.RegisteredSealProof) *pendingPiece {
|
|
doneCh := make(chan struct{})
|
|
pp := &pendingPiece{
|
|
size: size,
|
|
deal: deal,
|
|
claimTerms: ct,
|
|
|
|
data: data,
|
|
|
|
doneCh: doneCh,
|
|
assigned: false,
|
|
}
|
|
pp.accepted = func(sn abi.SectorNumber, offset abi.UnpaddedPieceSize, err error) {
|
|
pp.resp = &pieceAcceptResp{sn, offset, err}
|
|
close(pp.doneCh)
|
|
}
|
|
|
|
m.pendingPieces[proposalCID(deal)] = pp
|
|
go func() {
|
|
defer m.inputLk.Unlock()
|
|
if err := m.updateInput(ctx, sp); err != nil {
|
|
log.Errorf("%+v", err)
|
|
}
|
|
}()
|
|
|
|
return pp
|
|
}
|
|
|
|
func waitAddPieceResp(ctx context.Context, pp *pendingPiece) (*pieceAcceptResp, error) {
|
|
select {
|
|
case <-pp.doneCh:
|
|
res := pp.resp
|
|
return res, nil
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
}
|
|
}
|
|
|
|
func (m *Sealing) SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error {
|
|
sp, err := m.currentSealProof(ctx)
|
|
if err != nil {
|
|
return xerrors.Errorf("failed to get current seal proof: %w", err)
|
|
}
|
|
log.Debug("pieces to sector matching waiting for lock")
|
|
m.inputLk.Lock()
|
|
defer m.inputLk.Unlock()
|
|
return m.updateInput(ctx, sp)
|
|
}
|
|
|
|
type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error)
|
|
|
|
// called with m.inputLk
|
|
func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error {
|
|
memo := make(map[abi.SectorNumber]struct {
|
|
e abi.ChainEpoch
|
|
p abi.TokenAmount
|
|
})
|
|
getExpirationCached := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) {
|
|
if e, ok := memo[sn]; ok {
|
|
return e.e, e.p, nil
|
|
}
|
|
onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, types.TipSetKey{})
|
|
if err != nil {
|
|
return 0, big.Zero(), err
|
|
}
|
|
memo[sn] = struct {
|
|
e abi.ChainEpoch
|
|
p abi.TokenAmount
|
|
}{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge}
|
|
return onChainInfo.Expiration, onChainInfo.InitialPledge, nil
|
|
}
|
|
|
|
ssize, err := sp.SectorSize()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
type match struct {
|
|
sector abi.SectorID
|
|
deal cid.Cid
|
|
|
|
dealEnd abi.ChainEpoch
|
|
claimTermEnd abi.ChainEpoch
|
|
|
|
size abi.UnpaddedPieceSize
|
|
padding abi.UnpaddedPieceSize
|
|
}
|
|
|
|
var matches []match
|
|
toAssign := map[cid.Cid]struct{}{} // used to maybe create new sectors
|
|
|
|
// todo: this is distinctly O(n^2), may need to be optimized for tiny deals and large scale miners
|
|
// (unlikely to be a problem now)
|
|
for proposalCid, piece := range m.pendingPieces {
|
|
if piece.assigned {
|
|
continue // already assigned to a sector, skip
|
|
}
|
|
|
|
toAssign[proposalCid] = struct{}{}
|
|
|
|
for id, sector := range m.openSectors {
|
|
avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used
|
|
// check that sector lifetime is long enough to fit deal using latest expiration from on chain
|
|
|
|
ok, err := sector.checkDealAssignable(piece, getExpirationCached)
|
|
if err != nil {
|
|
log.Errorf("failed to check expiration for cc Update sector %d", sector.number)
|
|
continue
|
|
}
|
|
if !ok {
|
|
exp, _, _ := getExpirationCached(sector.number)
|
|
|
|
// todo move this log into checkDealAssignable, make more detailed about the reason
|
|
log.Debugf("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch)
|
|
continue
|
|
}
|
|
|
|
if piece.size <= avail { // (note: if we have enough space for the piece, we also have enough space for inter-piece padding)
|
|
matches = append(matches, match{
|
|
sector: id,
|
|
deal: proposalCid,
|
|
|
|
dealEnd: piece.deal.DealProposal.EndEpoch,
|
|
claimTermEnd: piece.claimTerms.claimTermEnd,
|
|
|
|
size: piece.size,
|
|
padding: avail % piece.size,
|
|
})
|
|
}
|
|
}
|
|
}
|
|
sort.Slice(matches, func(i, j int) bool {
|
|
// todo maybe sort by expiration
|
|
|
|
if matches[i].padding != matches[j].padding { // less padding is better
|
|
return matches[i].padding < matches[j].padding
|
|
}
|
|
|
|
if matches[i].size != matches[j].size { // larger pieces are better
|
|
return matches[i].size < matches[j].size
|
|
}
|
|
|
|
return matches[i].sector.Number < matches[j].sector.Number // prefer older sectors
|
|
})
|
|
|
|
log.Debugw("updateInput matching", "matches", len(matches), "toAssign", len(toAssign), "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
|
|
|
|
var assigned int
|
|
for _, mt := range matches {
|
|
if m.pendingPieces[mt.deal].assigned {
|
|
assigned++
|
|
continue
|
|
}
|
|
|
|
if _, found := m.openSectors[mt.sector]; !found {
|
|
continue
|
|
}
|
|
|
|
// late checks
|
|
|
|
avail := abi.PaddedPieceSize(ssize).Unpadded() - m.openSectors[mt.sector].used
|
|
|
|
if mt.size > avail {
|
|
continue
|
|
}
|
|
|
|
if m.openSectors[mt.sector].lastDealEnd > mt.claimTermEnd {
|
|
continue
|
|
}
|
|
|
|
// assign the piece!
|
|
|
|
err := m.openSectors[mt.sector].maybeAccept(mt.deal)
|
|
if err != nil {
|
|
m.pendingPieces[mt.deal].accepted(mt.sector.Number, 0, err) // non-error case in handleAddPiece
|
|
}
|
|
|
|
m.openSectors[mt.sector].used += mt.padding + mt.size
|
|
if mt.dealEnd > m.openSectors[mt.sector].lastDealEnd {
|
|
m.openSectors[mt.sector].lastDealEnd = mt.dealEnd
|
|
}
|
|
|
|
m.pendingPieces[mt.deal].assigned = true
|
|
delete(toAssign, mt.deal)
|
|
|
|
if err != nil {
|
|
log.Errorf("sector %d rejected deal %s: %+v", mt.sector, mt.deal, err)
|
|
continue
|
|
}
|
|
}
|
|
|
|
log.Debugw("updateInput matching done", "matches", len(matches), "toAssign", len(toAssign), "assigned", assigned, "openSectors", len(m.openSectors), "pieces", len(m.pendingPieces))
|
|
|
|
if len(toAssign) > 0 {
|
|
log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors)
|
|
if err := m.tryGetDealSector(ctx, sp, getExpirationCached); err != nil {
|
|
log.Errorw("Failed to create a new sector for deals", "error", err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// pendingPieceIndex is an index in the Sealing.pendingPieces map
|
|
type pendingPieceIndex cid.Cid
|
|
|
|
type pieceBound struct {
|
|
epoch abi.ChainEpoch
|
|
|
|
// boundStart marks deal /end/ epoch; only deals with boundStart lower or equal to expiration of a given sector can be
|
|
// put into that sector
|
|
boundStart []pendingPieceIndex
|
|
|
|
// boundEnd marks deal claim TermMax; only deals with boundEnd higher or equal to expiration of a given sector can be
|
|
// put into that sector
|
|
boundEnd []pendingPieceIndex
|
|
|
|
dealBytesInBound abi.UnpaddedPieceSize
|
|
}
|
|
|
|
func (m *Sealing) pendingPieceEpochBounds() []pieceBound {
|
|
boundsByEpoch := map[abi.ChainEpoch]*pieceBound{}
|
|
|
|
for ppi, piece := range m.pendingPieces {
|
|
// start bound on deal end
|
|
if boundsByEpoch[piece.deal.DealProposal.EndEpoch] == nil {
|
|
boundsByEpoch[piece.deal.DealProposal.EndEpoch] = &pieceBound{
|
|
epoch: piece.deal.DealProposal.EndEpoch,
|
|
}
|
|
}
|
|
boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart = append(boundsByEpoch[piece.deal.DealProposal.EndEpoch].boundStart, pendingPieceIndex(ppi))
|
|
|
|
// end bound on term max
|
|
if boundsByEpoch[piece.claimTerms.claimTermEnd] == nil {
|
|
boundsByEpoch[piece.claimTerms.claimTermEnd] = &pieceBound{
|
|
epoch: piece.claimTerms.claimTermEnd,
|
|
}
|
|
}
|
|
boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd = append(boundsByEpoch[piece.claimTerms.claimTermEnd].boundEnd, pendingPieceIndex(ppi))
|
|
}
|
|
|
|
out := make([]pieceBound, 0, len(boundsByEpoch))
|
|
for _, bound := range boundsByEpoch {
|
|
out = append(out, *bound)
|
|
}
|
|
|
|
sort.Slice(out, func(i, j int) bool {
|
|
return out[i].epoch < out[j].epoch
|
|
})
|
|
|
|
var curBoundBytes abi.UnpaddedPieceSize
|
|
for i, bound := range out {
|
|
for _, ppi := range bound.boundStart {
|
|
curBoundBytes += m.pendingPieces[cid.Cid(ppi)].size
|
|
}
|
|
for _, ppi := range bound.boundEnd {
|
|
curBoundBytes -= m.pendingPieces[cid.Cid(ppi)].size
|
|
}
|
|
|
|
out[i].dealBytesInBound = curBoundBytes
|
|
}
|
|
|
|
return out
|
|
}
|
|
|
|
func (m *Sealing) maybeUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, cfg sealiface.Config, ef expFn) (bool, error) {
|
|
if len(m.available) == 0 {
|
|
return false, nil
|
|
}
|
|
|
|
ts, err := m.Api.ChainHead(ctx)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
ssize, err := sp.SectorSize()
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
pieceBounds := m.pendingPieceEpochBounds()
|
|
|
|
findBound := func(sectorExp abi.ChainEpoch) *pieceBound {
|
|
if len(pieceBounds) == 0 {
|
|
return nil
|
|
}
|
|
f := sort.Search(len(pieceBounds), func(i int) bool {
|
|
return sectorExp <= pieceBounds[i].epoch
|
|
})
|
|
if f == 0 {
|
|
// all piece bounds are after sector expiration
|
|
return nil
|
|
}
|
|
return &pieceBounds[f-1]
|
|
}
|
|
|
|
targetExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinTargetUpgradeSectorExpiration)
|
|
minExpirationEpoch := ts.Height() + abi.ChainEpoch(cfg.MinUpgradeSectorExpiration)
|
|
|
|
var candidate abi.SectorID
|
|
var bestExpiration abi.ChainEpoch
|
|
var bestDealBytes abi.PaddedPieceSize
|
|
bestPledge := types.TotalFilecoinInt
|
|
|
|
for s := range m.available {
|
|
expirationEpoch, pledge, err := ef(s.Number)
|
|
if err != nil {
|
|
log.Errorw("checking sector expiration", "error", err)
|
|
continue
|
|
}
|
|
|
|
slowChecks := func(sid abi.SectorNumber) bool {
|
|
active, err := m.sectorActive(ctx, types.TipSetKey{}, sid)
|
|
if err != nil {
|
|
log.Errorw("checking sector active", "error", err)
|
|
return false
|
|
}
|
|
if !active {
|
|
log.Debugw("skipping available sector", "sector", sid, "reason", "not active")
|
|
return false
|
|
}
|
|
return true
|
|
}
|
|
|
|
if expirationEpoch < minExpirationEpoch {
|
|
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below MinUpgradeSectorExpiration")
|
|
}
|
|
|
|
pb := findBound(expirationEpoch)
|
|
if pb == nil {
|
|
log.Debugw("skipping available sector", "sector", s.Number, "reason", "expiration below deal bounds")
|
|
continue
|
|
}
|
|
|
|
// if the sector has less than one sector worth of candidate deals, and
|
|
// the best candidate has more candidate deals, this sector isn't better
|
|
if pb.dealBytesInBound.Padded() < abi.PaddedPieceSize(ssize) {
|
|
if bestDealBytes > pb.dealBytesInBound.Padded() {
|
|
continue
|
|
}
|
|
}
|
|
|
|
// if best is below target, we want larger expirations
|
|
// if best is above target, we want lower pledge, but only if still above target
|
|
|
|
// todo: after nv17 "target expiration" doesn't really make that much sense
|
|
// (tho to be fair it doesn't make too much sense now either)
|
|
// we probably want the lowest expiration that's still above the configured
|
|
// minimum, and can fit most candidate deals
|
|
|
|
if bestExpiration < targetExpirationEpoch {
|
|
if expirationEpoch > bestExpiration && slowChecks(s.Number) {
|
|
bestExpiration = expirationEpoch
|
|
bestPledge = pledge
|
|
bestDealBytes = pb.dealBytesInBound.Padded()
|
|
candidate = s
|
|
}
|
|
continue
|
|
}
|
|
|
|
if expirationEpoch >= targetExpirationEpoch && pledge.LessThan(bestPledge) && slowChecks(s.Number) {
|
|
bestExpiration = expirationEpoch
|
|
bestPledge = pledge
|
|
bestDealBytes = pb.dealBytesInBound.Padded()
|
|
candidate = s
|
|
}
|
|
}
|
|
|
|
if bestExpiration < minExpirationEpoch {
|
|
log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpirationEpoch, "min", minExpirationEpoch, "candidate", candidate)
|
|
// didn't find a good sector / no sectors were available
|
|
return false, nil
|
|
}
|
|
|
|
log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge))
|
|
delete(m.available, candidate)
|
|
m.nextDealSector = &candidate.Number
|
|
return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{})
|
|
}
|
|
|
|
// call with m.inputLk
|
|
func (m *Sealing) createSector(ctx context.Context, cfg sealiface.Config, sp abi.RegisteredSealProof) (abi.SectorNumber, error) {
|
|
sid, err := m.NextSectorNumber(ctx)
|
|
if err != nil {
|
|
return 0, xerrors.Errorf("getting sector number: %w", err)
|
|
}
|
|
|
|
err = m.sealer.NewSector(ctx, m.minerSector(sp, sid))
|
|
if err != nil {
|
|
return 0, xerrors.Errorf("initializing sector: %w", err)
|
|
}
|
|
|
|
// update stats early, fsm planner would do that async
|
|
m.stats.updateSector(ctx, cfg, m.minerSectorID(sid), UndefinedSectorState)
|
|
|
|
return sid, err
|
|
}
|
|
|
|
func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error {
|
|
m.startupWait.Wait()
|
|
|
|
if m.nextDealSector != nil {
|
|
return nil // new sector is being created right now
|
|
}
|
|
|
|
cfg, err := m.getConfig()
|
|
if err != nil {
|
|
return xerrors.Errorf("getting storage config: %w", err)
|
|
}
|
|
|
|
// if we're above WaitDeals limit, we don't want to add more staging sectors
|
|
if cfg.MaxWaitDealsSectors > 0 && m.stats.curStaging() >= cfg.MaxWaitDealsSectors {
|
|
return nil
|
|
}
|
|
|
|
maxUpgrading := cfg.MaxSealingSectorsForDeals
|
|
if cfg.MaxUpgradingSectors > 0 {
|
|
maxUpgrading = cfg.MaxUpgradingSectors
|
|
}
|
|
|
|
canCreate := cfg.MakeNewSectorForDeals && !(cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals)
|
|
canUpgrade := !(maxUpgrading > 0 && m.stats.curSealing() >= maxUpgrading)
|
|
|
|
// we want to try to upgrade when:
|
|
// - we can upgrade and prefer upgrades
|
|
// - we don't prefer upgrades, but can't create a new sector
|
|
shouldUpgrade := canUpgrade && (!cfg.PreferNewSectorsForDeals || !canCreate)
|
|
|
|
log.Infow("new deal sector decision",
|
|
"sealing", m.stats.curSealing(),
|
|
"maxSeal", cfg.MaxSealingSectorsForDeals,
|
|
"maxUpgrade", maxUpgrading,
|
|
"preferNew", cfg.PreferNewSectorsForDeals,
|
|
"canCreate", canCreate,
|
|
"canUpgrade", canUpgrade,
|
|
"shouldUpgrade", shouldUpgrade)
|
|
|
|
if shouldUpgrade {
|
|
got, err := m.maybeUpgradeSector(ctx, sp, cfg, ef)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if got {
|
|
return nil
|
|
}
|
|
}
|
|
|
|
if canCreate {
|
|
sid, err := m.createSector(ctx, cfg, sp)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
m.nextDealSector = &sid
|
|
|
|
log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp)
|
|
if err := m.sectors.Send(uint64(sid), SectorStart{
|
|
ID: sid,
|
|
SectorType: sp,
|
|
}); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *Sealing) StartPackingSector(sid abi.SectorNumber) error {
|
|
m.startupWait.Wait()
|
|
|
|
log.Infow("starting to seal deal sector", "sector", sid, "trigger", "user")
|
|
return m.sectors.Send(uint64(sid), SectorStartPacking{})
|
|
}
|
|
|
|
func (m *Sealing) SectorAbortUpgrade(sid abi.SectorNumber) error {
|
|
m.startupWait.Wait()
|
|
|
|
m.inputLk.Lock()
|
|
// always do this early
|
|
delete(m.available, m.minerSectorID(sid))
|
|
m.inputLk.Unlock()
|
|
|
|
log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user")
|
|
return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")})
|
|
}
|
|
|
|
func (m *Sealing) SectorsStatus(ctx context.Context, sid abi.SectorNumber, showOnChainInfo bool) (api.SectorInfo, error) {
|
|
if showOnChainInfo {
|
|
return api.SectorInfo{}, xerrors.Errorf("on-chain info not supported")
|
|
}
|
|
|
|
info, err := m.GetSectorInfo(sid)
|
|
if err != nil {
|
|
return api.SectorInfo{}, err
|
|
}
|
|
|
|
deals := make([]abi.DealID, len(info.Pieces))
|
|
pieces := make([]api.SectorPiece, len(info.Pieces))
|
|
for i, piece := range info.Pieces {
|
|
pieces[i].Piece = piece.Piece
|
|
if piece.DealInfo == nil {
|
|
continue
|
|
}
|
|
|
|
pdi := *piece.DealInfo // copy
|
|
pieces[i].DealInfo = &pdi
|
|
|
|
deals[i] = piece.DealInfo.DealID
|
|
}
|
|
|
|
log := make([]api.SectorLog, len(info.Log))
|
|
for i, l := range info.Log {
|
|
log[i] = api.SectorLog{
|
|
Kind: l.Kind,
|
|
Timestamp: l.Timestamp,
|
|
Trace: l.Trace,
|
|
Message: l.Message,
|
|
}
|
|
}
|
|
|
|
sInfo := api.SectorInfo{
|
|
SectorID: sid,
|
|
State: api.SectorState(info.State),
|
|
CommD: info.CommD,
|
|
CommR: info.CommR,
|
|
Proof: info.Proof,
|
|
Deals: deals,
|
|
Pieces: pieces,
|
|
Ticket: api.SealTicket{
|
|
Value: info.TicketValue,
|
|
Epoch: info.TicketEpoch,
|
|
},
|
|
Seed: api.SealSeed{
|
|
Value: info.SeedValue,
|
|
Epoch: info.SeedEpoch,
|
|
},
|
|
PreCommitMsg: info.PreCommitMessage,
|
|
CommitMsg: info.CommitMessage,
|
|
Retries: info.InvalidProofs,
|
|
ToUpgrade: false,
|
|
ReplicaUpdateMessage: info.ReplicaUpdateMessage,
|
|
|
|
LastErr: info.LastErr,
|
|
Log: log,
|
|
// on chain info
|
|
SealProof: info.SectorType,
|
|
Activation: 0,
|
|
Expiration: 0,
|
|
DealWeight: big.Zero(),
|
|
VerifiedDealWeight: big.Zero(),
|
|
InitialPledge: big.Zero(),
|
|
OnTime: 0,
|
|
Early: 0,
|
|
}
|
|
|
|
return sInfo, nil
|
|
}
|
|
|
|
func proposalCID(deal api.PieceDealInfo) cid.Cid {
|
|
pc, err := deal.DealProposal.Cid()
|
|
if err != nil {
|
|
log.Errorf("DealProposal.Cid error: %+v", err)
|
|
return cid.Undef
|
|
}
|
|
|
|
return pc
|
|
}
|
|
|
|
var _ sectorblocks.SectorBuilder = &Sealing{}
|