diff --git a/cmd/lotus-miner/info.go b/cmd/lotus-miner/info.go index 32219e73b..f6629fcf4 100644 --- a/cmd/lotus-miner/info.go +++ b/cmd/lotus-miner/info.go @@ -466,6 +466,7 @@ var stateOrder = map[sealing.SectorState]stateMeta{} var stateList = []stateMeta{ {col: 39, state: "Total"}, {col: color.FgGreen, state: sealing.Proving}, + {col: color.FgGreen, state: sealing.Available}, {col: color.FgGreen, state: sealing.UpdateActivating}, {col: color.FgBlue, state: sealing.Empty}, diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index 24098b558..150c0f2c6 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -351,7 +351,7 @@ var sectorsListCmd = &cli.Command{ if cctx.Bool("unproven") { for state := range sealing.ExistSectorStateList { - if state == sealing.Proving { + if state == sealing.Proving || state == sealing.Available { continue } states = append(states, api.SectorState(state)) diff --git a/documentation/en/default-lotus-miner-config.toml b/documentation/en/default-lotus-miner-config.toml index 30dfd0268..d00d790c5 100644 --- a/documentation/en/default-lotus-miner-config.toml +++ b/documentation/en/default-lotus-miner-config.toml @@ -365,6 +365,12 @@ # env var: LOTUS_SEALING_FINALIZEEARLY #FinalizeEarly = false + # After sealing CC sectors, make them available for upgrading with deals + # + # type: bool + # env var: LOTUS_SEALING_MAKECCSECTORSAVAILABLE + #MakeCCSectorsAvailable = false + # Whether to use available miner balance for sector collateral instead of sending it with each message # # type: bool diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 2c50d1885..b85290f04 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -111,6 +111,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto Committing: planCommitting, CommitFinalize: planOne( on(SectorFinalized{}, SubmitCommit), + on(SectorFinalizedAvailable{}, SubmitCommit), on(SectorFinalizeFailed{}, CommitFinalizeFailed), ), SubmitCommit: planOne( @@ -136,6 +137,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto FinalizeSector: planOne( on(SectorFinalized{}, Proving), + on(SectorFinalizedAvailable{}, Available), on(SectorFinalizeFailed{}, FinalizeFailed), ), @@ -283,7 +285,11 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto Proving: planOne( on(SectorFaultReported{}, FaultReported), on(SectorFaulty{}, Faulty), + on(SectorMarkForUpdate{}, Available), + ), + Available: planOne( on(SectorStartCCUpdate{}, SnapDealsWaitDeals), + on(SectorAbortUpgrade{}, Proving), ), Terminating: planOne( on(SectorTerminating{}, TerminateWait), @@ -558,6 +564,8 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // Post-seal case Proving: return m.handleProvingSector, processed, nil + case Available: + return m.handleAvailableSector, processed, nil case Terminating: return m.handleTerminating, processed, nil case TerminateWait: diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index fc3b774f9..66497473f 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -286,6 +286,10 @@ type SectorFinalized struct{} func (evt SectorFinalized) apply(*SectorInfo) {} +type SectorFinalizedAvailable struct{} + +func (evt SectorFinalizedAvailable) apply(*SectorInfo) {} + type SectorRetryFinalize struct{} func (evt SectorRetryFinalize) apply(*SectorInfo) {} @@ -297,6 +301,10 @@ func (evt SectorFinalizeFailed) apply(*SectorInfo) {} // Snap deals // CC update path +type SectorMarkForUpdate struct{} + +func (evt SectorMarkForUpdate) apply(state *SectorInfo) {} + type SectorStartCCUpdate struct{} func (evt SectorStartCCUpdate) apply(state *SectorInfo) { diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index e644cd848..d2b51edc9 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -12,11 +12,14 @@ import ( "github.com/filecoin-project/go-commp-utils/zerocomm" "github.com/filecoin-project/go-padreader" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-storage/storage" "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/policy" + "github.com/filecoin-project/lotus/chain/types" sectorstorage "github.com/filecoin-project/lotus/extern/sector-storage" "github.com/filecoin-project/lotus/extern/sector-storage/ffiwrapper" "github.com/filecoin-project/lotus/extern/storage-sealing/sealiface" @@ -30,8 +33,8 @@ func (m *Sealing) handleWaitDeals(ctx statemachine.Context, sector SectorInfo) e m.inputLk.Lock() - if m.creating != nil && *m.creating == sector.SectorNumber { - m.creating = nil + if m.nextDealSector != nil && *m.nextDealSector == sector.SectorNumber { + m.nextDealSector = nil } sid := m.minerSectorID(sector.SectorNumber) @@ -384,8 +387,29 @@ func (m *Sealing) MatchPendingPiecesToOpenSectors(ctx context.Context) error { return m.updateInput(ctx, sp) } +type expFn func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) + // called with m.inputLk func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) error { + memo := make(map[abi.SectorNumber]struct { + e abi.ChainEpoch + p abi.TokenAmount + }) + expF := func(sn abi.SectorNumber) (abi.ChainEpoch, abi.TokenAmount, error) { + if e, ok := memo[sn]; ok { + return e.e, e.p, nil + } + onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{}) + if err != nil { + return 0, big.Zero(), err + } + memo[sn] = struct { + e abi.ChainEpoch + p abi.TokenAmount + }{e: onChainInfo.Expiration, p: onChainInfo.InitialPledge} + return onChainInfo.Expiration, onChainInfo.InitialPledge, nil + } + ssize, err := sp.SectorSize() if err != nil { return err @@ -411,19 +435,6 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e toAssign[proposalCid] = struct{}{} - memo := make(map[abi.SectorNumber]abi.ChainEpoch) - expF := func(sn abi.SectorNumber) (abi.ChainEpoch, error) { - if exp, ok := memo[sn]; ok { - return exp, nil - } - onChainInfo, err := m.Api.StateSectorGetInfo(ctx, m.maddr, sn, TipSetToken{}) - if err != nil { - return 0, err - } - memo[sn] = onChainInfo.Expiration - return onChainInfo.Expiration, nil - } - for id, sector := range m.openSectors { avail := abi.PaddedPieceSize(ssize).Unpadded() - sector.used // check that sector lifetime is long enough to fit deal using latest expiration from on chain @@ -434,7 +445,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e continue } if !ok { - exp, _ := expF(sector.number) + exp, _, _ := expF(sector.number) log.Infof("CC update sector %d cannot fit deal, expiration %d before deal end epoch %d", id, exp, piece.deal.DealProposal.EndEpoch) continue } @@ -497,7 +508,7 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e if len(toAssign) > 0 { log.Errorf("we are trying to create a new sector with open sectors %v", m.openSectors) - if err := m.tryCreateDealSector(ctx, sp); err != nil { + if err := m.tryGetDealSector(ctx, sp, expF); err != nil { log.Errorw("Failed to create a new sector for deals", "error", err) } } @@ -505,10 +516,113 @@ func (m *Sealing) updateInput(ctx context.Context, sp abi.RegisteredSealProof) e return nil } -func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSealProof) error { +func (m *Sealing) calcTargetExpiration(ctx context.Context, ssize abi.SectorSize) (minTarget, target abi.ChainEpoch, err error) { + var candidates []*pendingPiece + + for _, piece := range m.pendingPieces { + if piece.assigned { + continue // already assigned to a sector, skip + } + candidates = append(candidates, piece) + } + + // earliest expiration first + sort.Slice(candidates, func(i, j int) bool { + return candidates[i].deal.DealProposal.EndEpoch < candidates[j].deal.DealProposal.EndEpoch + }) + + var totalBytes uint64 + for _, candidate := range candidates { + totalBytes += uint64(candidate.size) + + if totalBytes >= uint64(abi.PaddedPieceSize(ssize).Unpadded()) { + return candidates[0].deal.DealProposal.EndEpoch, candidate.deal.DealProposal.EndEpoch, nil + } + } + + _, curEpoch, err := m.Api.ChainHead(ctx) + if err != nil { + return 0, 0, xerrors.Errorf("getting current epoch: %w", err) + } + + minDur, maxDur := policy.DealDurationBounds(0) + + return curEpoch + minDur, curEpoch + maxDur, nil +} + +func (m *Sealing) tryGetUpgradeSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) (bool, error) { + if len(m.available) == 0 { + return false, nil + } + + ssize, err := sp.SectorSize() + if err != nil { + return false, xerrors.Errorf("getting sector size: %w", err) + } + minExpiration, targetExpiration, err := m.calcTargetExpiration(ctx, ssize) + if err != nil { + return false, xerrors.Errorf("calculating min target expiration: %w", err) + } + + var candidate abi.SectorID + var bestExpiration abi.ChainEpoch + bestPledge := types.TotalFilecoinInt + + for s := range m.available { + expiration, pledge, err := ef(s.Number) + if err != nil { + log.Errorw("checking sector expiration", "error", err) + continue + } + + slowChecks := func(sid abi.SectorNumber) bool { + active, err := m.sectorActive(ctx, TipSetToken{}, sid) + if err != nil { + log.Errorw("checking sector active", "error", err) + return false + } + if !active { + log.Debugw("skipping available sector", "reason", "not active") + return false + } + return true + } + + // if best is below target, we want larger expirations + // if best is above target, we want lower pledge, but only if still above target + + if bestExpiration < targetExpiration { + if expiration > bestExpiration && slowChecks(s.Number) { + bestExpiration = expiration + bestPledge = pledge + candidate = s + } + continue + } + + if expiration >= targetExpiration && pledge.LessThan(bestPledge) && slowChecks(s.Number) { + bestExpiration = expiration + bestPledge = pledge + candidate = s + } + } + + if bestExpiration < minExpiration { + log.Infow("Not upgrading any sectors", "available", len(m.available), "pieces", len(m.pendingPieces), "bestExp", bestExpiration, "target", targetExpiration, "min", minExpiration, "candidate", candidate) + // didn't find a good sector / no sectors were available + return false, nil + } + + log.Infow("Upgrading sector", "number", candidate.Number, "type", "deal", "proofType", sp, "expiration", bestExpiration, "pledge", types.FIL(bestPledge)) + delete(m.available, candidate) + m.nextDealSector = &candidate.Number + return true, m.sectors.Send(uint64(candidate.Number), SectorStartCCUpdate{}) +} + +func (m *Sealing) tryGetDealSector(ctx context.Context, sp abi.RegisteredSealProof, ef expFn) error { m.startupWait.Wait() - if m.creating != nil { + if m.nextDealSector != nil { return nil // new sector is being created right now } @@ -517,10 +631,6 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return xerrors.Errorf("getting storage config: %w", err) } - if !cfg.MakeNewSectorForDeals { - return nil - } - if cfg.MaxSealingSectorsForDeals > 0 && m.stats.curSealing() >= cfg.MaxSealingSectorsForDeals { return nil } @@ -529,12 +639,24 @@ func (m *Sealing) tryCreateDealSector(ctx context.Context, sp abi.RegisteredSeal return nil } + got, err := m.tryGetUpgradeSector(ctx, sp, ef) + if err != nil { + return err + } + if got { + return nil + } + + if !cfg.MakeNewSectorForDeals { + return nil + } + sid, err := m.createSector(ctx, cfg, sp) if err != nil { return err } - m.creating = &sid + m.nextDealSector = &sid log.Infow("Creating sector", "number", sid, "type", "deal", "proofType", sp) return m.sectors.Send(uint64(sid), SectorStart{ @@ -573,6 +695,11 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error { func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { m.startupWait.Wait() + m.inputLk.Lock() + // always do this early + delete(m.available, m.minerSectorID(sid)) + m.inputLk.Unlock() + log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user") return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) } diff --git a/extern/storage-sealing/mocks/api.go b/extern/storage-sealing/mocks/api.go index 95c222ecd..efe89ff0b 100644 --- a/extern/storage-sealing/mocks/api.go +++ b/extern/storage-sealing/mocks/api.go @@ -9,6 +9,7 @@ import ( reflect "reflect" address "github.com/filecoin-project/go-address" + bitfield "github.com/filecoin-project/go-bitfield" abi "github.com/filecoin-project/go-state-types/abi" big "github.com/filecoin-project/go-state-types/big" crypto "github.com/filecoin-project/go-state-types/crypto" @@ -214,10 +215,10 @@ func (mr *MockSealingAPIMockRecorder) StateMarketStorageDealProposal(arg0, arg1, } // StateMinerActiveSectors mocks base method. -func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { +func (m *MockSealingAPI) StateMinerActiveSectors(arg0 context.Context, arg1 address.Address, arg2 sealing.TipSetToken) (bitfield.BitField, error) { m.ctrl.T.Helper() ret := m.ctrl.Call(m, "StateMinerActiveSectors", arg0, arg1, arg2) - ret0, _ := ret[0].([]*miner.SectorOnChainInfo) + ret0, _ := ret[0].(bitfield.BitField) ret1, _ := ret[1].(error) return ret0, ret1 } diff --git a/extern/storage-sealing/sealiface/config.go b/extern/storage-sealing/sealiface/config.go index 852034aa7..20bd2b564 100644 --- a/extern/storage-sealing/sealiface/config.go +++ b/extern/storage-sealing/sealiface/config.go @@ -20,6 +20,8 @@ type Config struct { MakeNewSectorForDeals bool + MakeCCSectorsAvailable bool + WaitDealsDelay time.Duration CommittedCapacitySectorLifetime time.Duration diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 907d7cdfd..8f6cf3226 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -13,6 +13,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" @@ -63,7 +64,7 @@ type SealingAPI interface { StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerAvailableBalance(context.Context, address.Address, TipSetToken) (big.Int, error) StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) - StateMinerActiveSectors(context.Context, address.Address, TipSetToken) ([]*miner.SectorOnChainInfo, error) + StateMinerActiveSectors(context.Context, address.Address, TipSetToken) (bitfield.BitField, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (*api.MarketDeal, error) StateMarketStorageDealProposal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) @@ -104,10 +105,11 @@ type Sealing struct { sectorTimers map[abi.SectorID]*time.Timer pendingPieces map[cid.Cid]*pendingPiece assignedPieces map[abi.SectorID][]cid.Cid - creating *abi.SectorNumber // used to prevent a race where we could create a new sector more than once + nextDealSector *abi.SectorNumber // used to prevent a race where we could create a new sector more than once upgradeLk sync.Mutex toUpgrade map[abi.SectorNumber]struct{} + available map[abi.SectorID]struct{} notifee SectorStateNotifee addrSel AddrSel @@ -129,11 +131,11 @@ type openSector struct { maybeAccept func(cid.Cid) error // called with inputLk } -func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF func(sn abi.SectorNumber) (abi.ChainEpoch, error)) (bool, error) { +func (o *openSector) dealFitsInLifetime(dealEnd abi.ChainEpoch, expF expFn) (bool, error) { if !o.ccUpdate { return true, nil } - expiration, err := expF(o.number) + expiration, _, err := expF(o.number) if err != nil { return false, err } @@ -179,6 +181,8 @@ func New(mctx context.Context, api SealingAPI, fc config.MinerFeeConfig, events assignedPieces: map[abi.SectorID][]cid.Cid{}, toUpgrade: map[abi.SectorNumber]struct{}{}, + available: map[abi.SectorID]struct{}{}, + notifee: notifee, addrSel: as, diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 0f8228d02..7547f00a0 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -25,6 +25,7 @@ var ExistSectorStateList = map[SectorState]struct{}{ CommitAggregateWait: {}, FinalizeSector: {}, Proving: {}, + Available: {}, FailedUnrecoverable: {}, SealPreCommit1Failed: {}, SealPreCommit2Failed: {}, @@ -98,6 +99,7 @@ const ( FinalizeSector SectorState = "FinalizeSector" Proving SectorState = "Proving" + Available SectorState = "Available" // proving CC available for SnapDeals // snap deals / cc update SnapDealsWaitDeals SectorState = "SnapDealsWaitDeals" @@ -161,7 +163,7 @@ func toStatState(st SectorState, finEarly bool) statSectorState { return sstProving } return sstSealing - case Proving, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: + case Proving, Available, UpdateActivating, ReleaseSectorKey, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving } diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index a1c3be460..90fa5090a 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -238,7 +238,7 @@ func (m *Sealing) handleSubmitReplicaUpdateFailed(ctx statemachine.Context, sect } // Abort upgrade for sectors that went faulty since being marked for upgrade - active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) + active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber) if err != nil { log.Errorf("sector active check: api error, not proceeding: %+v", err) return nil diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index e74119976..afb5c54bf 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -130,6 +130,11 @@ func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) er func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInfo) error { // TODO: track sector health / expiration + m.inputLk.Lock() + // in case we revert into Proving without going into Available + delete(m.available, m.minerSectorID(sector.SectorNumber)) + m.inputLk.Unlock() + cfg, err := m.getConfig() if err != nil { return xerrors.Errorf("getting sealing config: %w", err) @@ -144,3 +149,13 @@ func (m *Sealing) handleProvingSector(ctx statemachine.Context, sector SectorInf return nil } + +func (m *Sealing) handleAvailableSector(ctx statemachine.Context, sector SectorInfo) error { + m.inputLk.Lock() + m.available[m.minerSectorID(sector.SectorNumber)] = struct{}{} + m.inputLk.Unlock() + // TODO: Watch termination + // TODO: Auto-extend if set + + return nil +} diff --git a/extern/storage-sealing/states_replica_update.go b/extern/storage-sealing/states_replica_update.go index bede7a5fa..8a4f05dc4 100644 --- a/extern/storage-sealing/states_replica_update.go +++ b/extern/storage-sealing/states_replica_update.go @@ -41,7 +41,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect log.Errorf("handleProveReplicaUpdate: api error, not proceeding: %+v", err) return nil } - active, err := sectorActive(ctx.Context(), m.Api, m.maddr, tok, sector.SectorNumber) + active, err := m.sectorActive(ctx.Context(), tok, sector.SectorNumber) if err != nil { log.Errorf("sector active check: api error, not proceeding: %+v", err) return nil diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 3dba325ee..f508717f7 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -782,5 +782,8 @@ func (m *Sealing) handleFinalizeSector(ctx statemachine.Context, sector SectorIn return ctx.Send(SectorFinalizeFailed{xerrors.Errorf("finalize sector: %w", err)}) } + if cfg.MakeCCSectorsAvailable && !sector.hasDeals() { + return ctx.Send(SectorFinalizedAvailable{}) + } return ctx.Send(SectorFinalized{}) } diff --git a/extern/storage-sealing/upgrade_queue.go b/extern/storage-sealing/upgrade_queue.go index 1e5bef67c..fe95a6aa8 100644 --- a/extern/storage-sealing/upgrade_queue.go +++ b/extern/storage-sealing/upgrade_queue.go @@ -3,14 +3,13 @@ package sealing import ( "context" - "github.com/filecoin-project/go-address" - "github.com/filecoin-project/lotus/chain/actors/builtin/miner" - market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market" - "golang.org/x/xerrors" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" + + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" + market7 "github.com/filecoin-project/specs-actors/v7/actors/builtin/market" ) func (m *Sealing) IsMarkedForUpgrade(id abi.SectorNumber) bool { @@ -50,17 +49,6 @@ func (m *Sealing) MarkForUpgrade(ctx context.Context, id abi.SectorNumber) error } func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) error { - cfg, err := m.getConfig() - if err != nil { - return xerrors.Errorf("getting storage config: %w", err) - } - - curStaging := m.stats.curStaging() - if cfg.MaxWaitDealsSectors > 0 && curStaging >= cfg.MaxWaitDealsSectors { - return xerrors.Errorf("already waiting for deals in %d >= %d (cfg.MaxWaitDealsSectors) sectors, no free resources to wait for deals in another", - curStaging, cfg.MaxWaitDealsSectors) - } - si, err := m.GetSectorInfo(id) if err != nil { return xerrors.Errorf("getting sector info: %w", err) @@ -70,11 +58,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e return xerrors.Errorf("can't mark sectors not in the 'Proving' state for upgrade") } - if len(si.Pieces) != 1 { - return xerrors.Errorf("not a committed-capacity sector, expected 1 piece") - } - - if si.Pieces[0].DealInfo != nil { + if si.hasDeals() { return xerrors.Errorf("not a committed-capacity sector, has deals") } @@ -87,7 +71,7 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e return xerrors.Errorf("failed to read sector on chain info: %w", err) } - active, err := sectorActive(ctx, m.Api, m.maddr, tok, id) + active, err := m.sectorActive(ctx, tok, id) if err != nil { return xerrors.Errorf("failed to check if sector is active") } @@ -100,24 +84,16 @@ func (m *Sealing) MarkForSnapUpgrade(ctx context.Context, id abi.SectorNumber) e "Upgrade expiration before marking for upgrade", id, onChainInfo.Expiration) } - return m.sectors.Send(uint64(id), SectorStartCCUpdate{}) + return m.sectors.Send(uint64(id), SectorMarkForUpdate{}) } -func sectorActive(ctx context.Context, api SealingAPI, maddr address.Address, tok TipSetToken, sector abi.SectorNumber) (bool, error) { - active, err := api.StateMinerActiveSectors(ctx, maddr, tok) +func (m *Sealing) sectorActive(ctx context.Context, tok TipSetToken, sector abi.SectorNumber) (bool, error) { + active, err := m.Api.StateMinerActiveSectors(ctx, m.maddr, tok) if err != nil { return false, xerrors.Errorf("failed to check active sectors: %w", err) } - // Ensure the upgraded sector is active - var found bool - for _, si := range active { - if si.SectorNumber == sector { - found = true - break - } - } - return found, nil + return active.IsSet(uint64(sector)) } func (m *Sealing) tryUpgradeSector(ctx context.Context, params *miner.SectorPreCommitInfo) big.Int { diff --git a/itests/ccupgrade_test.go b/itests/ccupgrade_test.go index 6e7a5d090..53e1ac703 100644 --- a/itests/ccupgrade_test.go +++ b/itests/ccupgrade_test.go @@ -10,12 +10,14 @@ import ( "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" - "github.com/filecoin-project/lotus/api" - "github.com/filecoin-project/lotus/chain/types" - "github.com/filecoin-project/lotus/itests/kit" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/types" + sealing "github.com/filecoin-project/lotus/extern/storage-sealing" + "github.com/filecoin-project/lotus/itests/kit" ) func TestCCUpgrade(t *testing.T) { @@ -48,9 +50,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) fmt.Printf("CCUpgrade: %d\n", CCUpgrade) - // wait for deadline 0 to pass so that committing starts after post on preseals - // this gives max time for post to complete minimizing chances of timeout - // waitForDeadline(ctx, t, 1, client, maddr) miner.PledgeSectors(ctx, 1, 0, nil) sl, err := miner.SectorsList(ctx) require.NoError(t, err) @@ -89,18 +88,6 @@ func runTestCCUpgrade(t *testing.T) *kit.TestFullNode { return client } -func waitForDeadline(ctx context.Context, t *testing.T, waitIdx uint64, node *kit.TestFullNode, maddr address.Address) { - for { - ts, err := node.ChainHead(ctx) - require.NoError(t, err) - dl, err := node.StateMinerProvingDeadline(ctx, maddr, ts.Key()) - require.NoError(t, err) - if dl.Index == waitIdx { - return - } - } -} - func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, node *kit.TestFullNode, maddr address.Address) { for { active, err := node.StateMinerActiveSectors(ctx, maddr, types.EmptyTSK) @@ -116,18 +103,6 @@ func waitForSectorActive(ctx context.Context, t *testing.T, sn abi.SectorNumber, } } -func waitForSectorStartUpgrade(ctx context.Context, t *testing.T, sn abi.SectorNumber, miner *kit.TestMiner) { - for { - si, err := miner.StorageMiner.SectorsStatus(ctx, sn, false) - require.NoError(t, err) - if si.State != api.SectorState("Proving") { - t.Logf("Done proving sector in state: %s", si.State) - return - } - - } -} - func TestCCUpgradeAndPoSt(t *testing.T) { kit.QuietMiningLogs() t.Run("upgrade and then post", func(t *testing.T) { @@ -148,13 +123,13 @@ func TestCCUpgradeAndPoSt(t *testing.T) { }) } -func TestTooManyMarkedForUpgrade(t *testing.T) { +func TestAbortUpgradeAvailable(t *testing.T) { kit.QuietMiningLogs() ctx := context.Background() blockTime := 1 * time.Millisecond - client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15)) + client, miner, ens := kit.EnsembleMinimal(t, kit.GenesisNetworkVersion(network.Version15), kit.ThroughRPC()) ens.InterconnectAll().BeginMiningMustPost(blockTime) maddr, err := miner.ActorAddress(ctx) @@ -163,32 +138,53 @@ func TestTooManyMarkedForUpgrade(t *testing.T) { } CCUpgrade := abi.SectorNumber(kit.DefaultPresealsPerBootstrapMiner + 1) - waitForDeadline(ctx, t, 1, client, maddr) - miner.PledgeSectors(ctx, 3, 0, nil) + fmt.Printf("CCUpgrade: %d\n", CCUpgrade) + miner.PledgeSectors(ctx, 1, 0, nil) sl, err := miner.SectorsList(ctx) require.NoError(t, err) - require.Len(t, sl, 3, "expected 3 sectors") - + require.Len(t, sl, 1, "expected 1 sector") + require.Equal(t, CCUpgrade, sl[0], "unexpected sector number") { si, err := client.StateSectorGetInfo(ctx, maddr, CCUpgrade, types.EmptyTSK) require.NoError(t, err) require.Less(t, 50000, int(si.Expiration)) } - waitForSectorActive(ctx, t, CCUpgrade, client, maddr) - waitForSectorActive(ctx, t, CCUpgrade+1, client, maddr) - waitForSectorActive(ctx, t, CCUpgrade+2, client, maddr) - err = miner.SectorMarkForUpgrade(ctx, CCUpgrade, true) - require.NoError(t, err) - err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+1, true) + err = miner.SectorMarkForUpgrade(ctx, sl[0], true) require.NoError(t, err) - waitForSectorStartUpgrade(ctx, t, CCUpgrade, miner) - waitForSectorStartUpgrade(ctx, t, CCUpgrade+1, miner) + sl, err = miner.SectorsList(ctx) + require.NoError(t, err) + require.Len(t, sl, 1, "expected 1 sector") - err = miner.SectorMarkForUpgrade(ctx, CCUpgrade+2, true) - require.Error(t, err) - assert.Contains(t, err.Error(), "no free resources to wait for deals") + ss, err := miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Proving) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Available), ss.State) + break + } + + require.NoError(t, miner.SectorAbortUpgrade(ctx, sl[0])) + + for i := 0; i < 100; i++ { + ss, err = miner.SectorsStatus(ctx, sl[0], false) + require.NoError(t, err) + if ss.State == api.SectorState(sealing.Available) { + time.Sleep(50 * time.Millisecond) + continue + } + + require.Equal(t, api.SectorState(sealing.Proving), ss.State) + break + } } diff --git a/itests/kit/log.go b/itests/kit/log.go index 3dce3af9d..4ec610baf 100644 --- a/itests/kit/log.go +++ b/itests/kit/log.go @@ -15,5 +15,6 @@ func QuietMiningLogs() { _ = logging.SetLogLevel("storageminer", "ERROR") _ = logging.SetLogLevel("pubsub", "ERROR") _ = logging.SetLogLevel("gen", "ERROR") + _ = logging.SetLogLevel("rpc", "ERROR") _ = logging.SetLogLevel("dht/RtRefreshManager", "ERROR") } diff --git a/node/config/doc_gen.go b/node/config/doc_gen.go index edb4cf892..972c196f7 100644 --- a/node/config/doc_gen.go +++ b/node/config/doc_gen.go @@ -750,6 +750,12 @@ avoid the relatively high cost of unsealing the data later, at the cost of more Comment: `Run sector finalization before submitting sector proof to the chain`, }, + { + Name: "MakeCCSectorsAvailable", + Type: "bool", + + Comment: `After sealing CC sectors, make them available for upgrading with deals`, + }, { Name: "CollateralFromMinerBalance", Type: "bool", diff --git a/node/config/types.go b/node/config/types.go index 8320aea7e..2e9357993 100644 --- a/node/config/types.go +++ b/node/config/types.go @@ -250,6 +250,9 @@ type SealingConfig struct { // Run sector finalization before submitting sector proof to the chain FinalizeEarly bool + // After sealing CC sectors, make them available for upgrading with deals + MakeCCSectorsAvailable bool + // Whether to use available miner balance for sector collateral instead of sending it with each message CollateralFromMinerBalance bool // Minimum available balance to keep in the miner actor before sending it with messages diff --git a/node/modules/storageminer.go b/node/modules/storageminer.go index bd21ee746..05d41a427 100644 --- a/node/modules/storageminer.go +++ b/node/modules/storageminer.go @@ -897,6 +897,7 @@ func NewSetSealConfigFunc(r repo.LockedRepo) (dtypes.SetSealingConfigFunc, error MaxSealingSectorsForDeals: cfg.MaxSealingSectorsForDeals, CommittedCapacitySectorLifetime: config.Duration(cfg.CommittedCapacitySectorLifetime), WaitDealsDelay: config.Duration(cfg.WaitDealsDelay), + MakeCCSectorsAvailable: cfg.MakeCCSectorsAvailable, AlwaysKeepUnsealedCopy: cfg.AlwaysKeepUnsealedCopy, FinalizeEarly: cfg.FinalizeEarly, @@ -935,6 +936,7 @@ func ToSealingConfig(cfg *config.StorageMiner) sealiface.Config { MakeNewSectorForDeals: cfg.Dealmaking.MakeNewSectorForDeals, CommittedCapacitySectorLifetime: time.Duration(cfg.Sealing.CommittedCapacitySectorLifetime), WaitDealsDelay: time.Duration(cfg.Sealing.WaitDealsDelay), + MakeCCSectorsAvailable: cfg.Sealing.MakeCCSectorsAvailable, AlwaysKeepUnsealedCopy: cfg.Sealing.AlwaysKeepUnsealedCopy, FinalizeEarly: cfg.Sealing.FinalizeEarly, diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 01ff9d8d3..d976d9aa2 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -9,6 +9,7 @@ import ( "golang.org/x/xerrors" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" @@ -112,13 +113,25 @@ func (s SealingAPIAdapter) StateMinerSectorAllocated(ctx context.Context, maddr return s.delegate.StateMinerSectorAllocated(ctx, maddr, sid, tsk) } -func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) ([]*miner.SectorOnChainInfo, error) { +func (s SealingAPIAdapter) StateMinerActiveSectors(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (bitfield.BitField, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { - return nil, xerrors.Errorf("faile dto unmarshal TipSetToken to TipSetKey: %w", err) + return bitfield.BitField{}, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) } - return s.delegate.StateMinerActiveSectors(ctx, maddr, tsk) + act, err := s.delegate.StateGetActor(ctx, maddr, tsk) + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("getting miner actor: temp error: %+v", err) + } + + stor := store.ActorStore(ctx, blockstore.NewAPIBlockstore(s.delegate)) + + state, err := miner.Load(stor, act) + if err != nil { + return bitfield.BitField{}, xerrors.Errorf("loading miner state: %+v", err) + } + + return miner.AllPartSectors(state, miner.Partition.ActiveSectors) } func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (sealing.MsgLookup, error) {