diff --git a/api/api_storage.go b/api/api_storage.go index 6e6d75c28..58fb70635 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -113,6 +113,8 @@ type StorageMiner interface { // SectorCommitPending returns a list of pending Commit sectors to be sent in the next aggregate message SectorCommitPending(ctx context.Context) ([]abi.SectorID, error) //perm:admin SectorMatchPendingPiecesToOpenSectors(ctx context.Context) error //perm:admin + // SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it + SectorAbortUpgrade(context.Context, abi.SectorNumber) error //perm:admin // WorkerConnect tells the node to connect to workers RPC WorkerConnect(context.Context, string) error //perm:admin retry:true diff --git a/api/proxy_gen.go b/api/proxy_gen.go index 63dc4aac8..32f59e0ec 100644 --- a/api/proxy_gen.go +++ b/api/proxy_gen.go @@ -747,6 +747,8 @@ type StorageMinerStruct struct { SealingSchedDiag func(p0 context.Context, p1 bool) (interface{}, error) `perm:"admin"` + SectorAbortUpgrade func(p0 context.Context, p1 abi.SectorNumber) error `perm:"admin"` + SectorAddPieceToAny func(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) `perm:"admin"` SectorCommitFlush func(p0 context.Context) ([]sealiface.CommitBatchRes, error) `perm:"admin"` @@ -4402,6 +4404,17 @@ func (s *StorageMinerStub) SealingSchedDiag(p0 context.Context, p1 bool) (interf return nil, ErrNotSupported } +func (s *StorageMinerStruct) SectorAbortUpgrade(p0 context.Context, p1 abi.SectorNumber) error { + if s.Internal.SectorAbortUpgrade == nil { + return ErrNotSupported + } + return s.Internal.SectorAbortUpgrade(p0, p1) +} + +func (s *StorageMinerStub) SectorAbortUpgrade(p0 context.Context, p1 abi.SectorNumber) error { + return ErrNotSupported +} + func (s *StorageMinerStruct) SectorAddPieceToAny(p0 context.Context, p1 abi.UnpaddedPieceSize, p2 storage.Data, p3 PieceDealInfo) (SectorOffset, error) { if s.Internal.SectorAddPieceToAny == nil { return *new(SectorOffset), ErrNotSupported diff --git a/build/openrpc/full.json.gz b/build/openrpc/full.json.gz index 5bf44f1a4..70cc388d1 100644 Binary files a/build/openrpc/full.json.gz and b/build/openrpc/full.json.gz differ diff --git a/build/openrpc/miner.json.gz b/build/openrpc/miner.json.gz index 0423aa9d0..633f24b68 100644 Binary files a/build/openrpc/miner.json.gz and b/build/openrpc/miner.json.gz differ diff --git a/build/openrpc/worker.json.gz b/build/openrpc/worker.json.gz index 4d7906343..70d7b60b8 100644 Binary files a/build/openrpc/worker.json.gz and b/build/openrpc/worker.json.gz differ diff --git a/cmd/lotus-miner/sectors.go b/cmd/lotus-miner/sectors.go index be8170422..9c4a5d0d8 100644 --- a/cmd/lotus-miner/sectors.go +++ b/cmd/lotus-miner/sectors.go @@ -55,6 +55,7 @@ var sectorsCmd = &cli.Command{ sectorsTerminateCmd, sectorsRemoveCmd, sectorsSnapUpCmd, + sectorsSnapAbortCmd, sectorsMarkForUpgradeCmd, sectorsStartSealCmd, sectorsSealDelayCmd, @@ -1520,6 +1521,31 @@ var sectorsSnapUpCmd = &cli.Command{ }, } +var sectorsSnapAbortCmd = &cli.Command{ + Name: "abort-upgrade", + Usage: "Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before", + ArgsUsage: "", + Action: func(cctx *cli.Context) error { + if cctx.Args().Len() != 1 { + return lcli.ShowHelp(cctx, xerrors.Errorf("must pass sector number")) + } + + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) + if err != nil { + return xerrors.Errorf("could not parse sector number: %w", err) + } + + return nodeApi.SectorAbortUpgrade(ctx, abi.SectorNumber(id)) + }, +} + var sectorsMarkForUpgradeCmd = &cli.Command{ Name: "mark-for-upgrade", Usage: "Mark a committed capacity sector for replacement by a sector with deals", diff --git a/documentation/en/api-v0-methods-miner.md b/documentation/en/api-v0-methods-miner.md index 386cbe1a0..dad3df010 100644 --- a/documentation/en/api-v0-methods-miner.md +++ b/documentation/en/api-v0-methods-miner.md @@ -112,6 +112,7 @@ * [SealingAbort](#SealingAbort) * [SealingSchedDiag](#SealingSchedDiag) * [Sector](#Sector) + * [SectorAbortUpgrade](#SectorAbortUpgrade) * [SectorAddPieceToAny](#SectorAddPieceToAny) * [SectorCommitFlush](#SectorCommitFlush) * [SectorCommitPending](#SectorCommitPending) @@ -1841,6 +1842,21 @@ Response: `{}` ## Sector +### SectorAbortUpgrade +SectorAbortUpgrade can be called on sectors that are in the process of being upgraded to abort it + + +Perms: admin + +Inputs: +```json +[ + 9 +] +``` + +Response: `{}` + ### SectorAddPieceToAny Add piece to an open sector. If no sectors with enough space are open, either a new sector will be created, or this call will block until more diff --git a/documentation/en/cli-lotus-miner.md b/documentation/en/cli-lotus-miner.md index 08fac332d..11155773a 100644 --- a/documentation/en/cli-lotus-miner.md +++ b/documentation/en/cli-lotus-miner.md @@ -1512,6 +1512,7 @@ COMMANDS: terminate Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector) remove Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty)) snap-up Mark a committed capacity sector to be filled with deals + abort-upgrade Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before mark-for-upgrade Mark a committed capacity sector for replacement by a sector with deals seal Manually start sealing a sector (filling any unused space with junk) set-seal-delay Set the time, in minutes, that a new sector waits for deals before sealing starts @@ -1747,6 +1748,19 @@ OPTIONS: ``` +### lotus-miner sectors abort-upgrade +``` +NAME: + lotus-miner sectors abort-upgrade - Abort the attempted (SnapDeals) upgrade of a CC sector, reverting it to as before + +USAGE: + lotus-miner sectors abort-upgrade [command options] + +OPTIONS: + --help, -h show help (default: false) + +``` + ### lotus-miner sectors mark-for-upgrade ``` NAME: diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 3525c84a7..56b0677c4 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -20,6 +20,7 @@ import ( // We should implement some wait-for-api logic type ErrApi struct{ error } +type ErrNoDeals struct{ error } type ErrInvalidDeals struct{ error } type ErrInvalidPiece struct{ error } type ErrExpiredDeals struct{ error } @@ -38,12 +39,14 @@ type ErrCommitWaitFailed struct{ error } type ErrBadRU struct{ error } type ErrBadPR struct{ error } -func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI) error { +func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api SealingAPI, mustHaveDeals bool) error { tok, height, err := api.ChainHead(ctx) if err != nil { return &ErrApi{xerrors.Errorf("getting chain head: %w", err)} } + dealCount := 0 + for i, p := range si.Pieces { // if no deal is associated with the piece, ensure that we added it as // filler (i.e. ensure that it has a zero PieceCID) @@ -55,6 +58,8 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api continue } + dealCount++ + proposal, err := api.StateMarketStorageDealProposal(ctx, p.DealInfo.DealID, tok) if err != nil { return &ErrInvalidDeals{xerrors.Errorf("getting deal %d for piece %d: %w", p.DealInfo.DealID, i, err)} @@ -77,13 +82,17 @@ func checkPieces(ctx context.Context, maddr address.Address, si SectorInfo, api } } + if mustHaveDeals && dealCount <= 0 { + return &ErrNoDeals{(xerrors.Errorf("sector %d must have deals, but does not", si.SectorNumber))} + } + return nil } // checkPrecommit checks that data commitment generated in the sealing process // matches pieces, and that the seal ticket isn't expired func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, height abi.ChainEpoch, api SealingAPI) (err error) { - if err := checkPieces(ctx, maddr, si, api); err != nil { + if err := checkPieces(ctx, maddr, si, api, false); err != nil { return err } @@ -184,7 +193,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, return &ErrInvalidProof{xerrors.New("invalid proof (compute error?)")} } - if err := checkPieces(ctx, m.maddr, si, m.Api); err != nil { + if err := checkPieces(ctx, m.maddr, si, m.Api, false); err != nil { return err } @@ -194,7 +203,7 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, // check that sector info is good after running a replica update func checkReplicaUpdate(ctx context.Context, maddr address.Address, si SectorInfo, tok TipSetToken, api SealingAPI) error { - if err := checkPieces(ctx, maddr, si, api); err != nil { + if err := checkPieces(ctx, maddr, si, api, true); err != nil { return err } if !si.CCUpdate { diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 83874e907..1b18efcc7 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -137,27 +137,32 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto SnapDealsWaitDeals: planOne( on(SectorAddPiece{}, SnapDealsAddPiece), on(SectorStartPacking{}, SnapDealsPacking), + on(SectorAbortUpgrade{}, AbortUpgrade), ), SnapDealsAddPiece: planOne( on(SectorPieceAdded{}, SnapDealsWaitDeals), apply(SectorStartPacking{}), apply(SectorAddPiece{}), on(SectorAddPieceFailed{}, SnapDealsAddPieceFailed), + on(SectorAbortUpgrade{}, AbortUpgrade), ), SnapDealsPacking: planOne( on(SectorPacked{}, UpdateReplica), + on(SectorAbortUpgrade{}, AbortUpgrade), ), UpdateReplica: planOne( on(SectorReplicaUpdate{}, ProveReplicaUpdate), on(SectorUpdateReplicaFailed{}, ReplicaUpdateFailed), on(SectorDealsExpired{}, SnapDealsDealsExpired), on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs), + on(SectorAbortUpgrade{}, AbortUpgrade), ), ProveReplicaUpdate: planOne( on(SectorProveReplicaUpdate{}, SubmitReplicaUpdate), on(SectorProveReplicaUpdateFailed{}, ReplicaUpdateFailed), on(SectorDealsExpired{}, SnapDealsDealsExpired), on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs), + on(SectorAbortUpgrade{}, AbortUpgrade), ), SubmitReplicaUpdate: planOne( on(SectorReplicaUpdateSubmitted{}, ReplicaUpdateWait), @@ -231,6 +236,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryWaitDeals{}, SnapDealsWaitDeals), apply(SectorStartPacking{}), apply(SectorAddPiece{}), + on(SectorAbortUpgrade{}, AbortUpgrade), ), SnapDealsDealsExpired: planOne( on(SectorAbortUpgrade{}, AbortUpgrade), @@ -249,6 +255,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryProveReplicaUpdate{}, ProveReplicaUpdate), on(SectorInvalidDealIDs{}, SnapDealsRecoverDealIDs), on(SectorDealsExpired{}, SnapDealsDealsExpired), + on(SectorAbortUpgrade{}, AbortUpgrade), ), // Post-seal diff --git a/extern/storage-sealing/input.go b/extern/storage-sealing/input.go index f3259f0cc..0afbaa369 100644 --- a/extern/storage-sealing/input.go +++ b/extern/storage-sealing/input.go @@ -524,6 +524,13 @@ func (m *Sealing) StartPacking(sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorStartPacking{}) } +func (m *Sealing) AbortUpgrade(sid abi.SectorNumber) error { + m.startupWait.Wait() + + log.Infow("aborting upgrade of sector", "sector", sid, "trigger", "user") + return m.sectors.Send(uint64(sid), SectorAbortUpgrade{xerrors.New("triggered by user")}) +} + func proposalCID(deal api.PieceDealInfo) cid.Cid { pc, err := deal.DealProposal.Cid() if err != nil { diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index c32ac4c3a..244d3e721 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -478,7 +478,7 @@ func (m *Sealing) HandleRecoverDealIDs(ctx statemachine.Context, sector SectorIn } func (m *Sealing) handleSnapDealsRecoverDealIDs(ctx statemachine.Context, sector SectorInfo) error { - return m.handleRecoverDealIDsOrFailWith(ctx, sector, SectorAbortUpgrade{}) + return m.handleRecoverDealIDsOrFailWith(ctx, sector, SectorAbortUpgrade{xerrors.New("failed recovering deal ids")}) } func recoveryPiecesToFix(ctx context.Context, api SealingAPI, sector SectorInfo, maddr address.Address) ([]int, int, error) { diff --git a/extern/storage-sealing/states_replica_update.go b/extern/storage-sealing/states_replica_update.go index 43d5467ed..cd3e43230 100644 --- a/extern/storage-sealing/states_replica_update.go +++ b/extern/storage-sealing/states_replica_update.go @@ -12,7 +12,7 @@ import ( ) func (m *Sealing) handleReplicaUpdate(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state return handleErrors(ctx, err, sector) } out, err := m.sealer.ReplicaUpdate(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.pieceInfos()) @@ -37,7 +37,7 @@ func (m *Sealing) handleProveReplicaUpdate(ctx statemachine.Context, sector Sect return ctx.Send(SectorProveReplicaUpdateFailed{xerrors.Errorf("prove replica update (1) failed: %w", err)}) } - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, true); err != nil { // Sanity check state return handleErrors(ctx, err, sector) } @@ -59,10 +59,6 @@ func (m *Sealing) handleSubmitReplicaUpdate(ctx statemachine.Context, sector Sec return nil } - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state - return handleErrors(ctx, err, sector) - } - if err := checkReplicaUpdate(ctx.Context(), m.maddr, sector, tok, m.Api); err != nil { return ctx.Send(SectorSubmitReplicaUpdateFailed{}) } @@ -196,8 +192,7 @@ func (m *Sealing) handleReplicaUpdateWait(ctx statemachine.Context, sector Secto } if !si.SealedCID.Equals(*sector.UpdateSealed) { - log.Errorf("mismatch of expected onchain sealed cid after replica update, expected %s got %s", sector.UpdateSealed, si.SealedCID) - return ctx.Send(SectorAbortUpgrade{}) + return ctx.Send(SectorAbortUpgrade{xerrors.Errorf("mismatch of expected onchain sealed cid after replica update, expected %s got %s", sector.UpdateSealed, si.SealedCID)}) } return ctx.Send(SectorReplicaUpdateLanded{}) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 2258250f4..3dba325ee 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -198,7 +198,7 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e } func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { - if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api); err != nil { // Sanity check state + if err := checkPieces(ctx.Context(), m.maddr, sector, m.Api, false); err != nil { // Sanity check state switch err.(type) { case *ErrApi: log.Errorf("handlePreCommit1: api error, not proceeding: %+v", err) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index cfee53aed..f8b79267b 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -384,6 +384,10 @@ func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.Sect return sm.Miner.MarkForUpgrade(ctx, id, snap) } +func (sm *StorageMinerAPI) SectorAbortUpgrade(ctx context.Context, number abi.SectorNumber) error { + return sm.Miner.SectorAbortUpgrade(number) +} + func (sm *StorageMinerAPI) SectorCommitFlush(ctx context.Context) ([]sealiface.CommitBatchRes, error) { return sm.Miner.CommitFlush(ctx) } diff --git a/storage/miner_sealing.go b/storage/miner_sealing.go index d8ef26835..a22c32a40 100644 --- a/storage/miner_sealing.go +++ b/storage/miner_sealing.go @@ -86,6 +86,10 @@ func (m *Miner) IsMarkedForUpgrade(id abi.SectorNumber) bool { return m.sealing.IsMarkedForUpgrade(id) } +func (m *Miner) SectorAbortUpgrade(sectorNum abi.SectorNumber) error { + return m.sealing.AbortUpgrade(sectorNum) +} + func (m *Miner) SectorAddPieceToAny(ctx context.Context, size abi.UnpaddedPieceSize, r storage.Data, d api.PieceDealInfo) (api.SectorOffset, error) { return m.sealing.SectorAddPieceToAny(ctx, size, r, d) }