diff --git a/.circleci/config.yml b/.circleci/config.yml index f38c5ba29..04feeedf3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -200,6 +200,8 @@ jobs: <<: *test test-window-post: <<: *test + test-terminate: + <<: *test test-conformance: description: | Run tests using a corpus of interoperable test vectors for Filecoin @@ -476,9 +478,15 @@ workflows: test-suite-name: cli packages: "./cli/... ./cmd/... ./api/..." - test-window-post: + codecov-upload: true go-test-flags: "-run=TestWindowedPost" winpost-test: "1" test-suite-name: window-post + - test-terminate: + codecov-upload: true + go-test-flags: "-run=TestTerminate" + winpost-test: "1" + test-suite-name: terminate - test-short: go-test-flags: "--timeout 10m --short" test-suite-name: short diff --git a/api/api_storage.go b/api/api_storage.go index 85eb03115..042dad73b 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -65,7 +65,17 @@ type StorageMiner interface { // SectorGetExpectedSealDuration gets the expected time for a sector to seal SectorGetExpectedSealDuration(context.Context) (time.Duration, error) SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error + // SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can + // be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. SectorRemove(context.Context, abi.SectorNumber) error + // SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then + // automatically removes it from storage + SectorTerminate(context.Context, abi.SectorNumber) error + // SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. + // Returns null if message wasn't sent + SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) + // SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message + SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) @@ -217,9 +227,12 @@ const ( PreCommitAddr AddrUse = iota CommitAddr PoStAddr + + TerminateSectorsAddr ) type AddressConfig struct { PreCommitControl []address.Address CommitControl []address.Address + TerminateControl []address.Address } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 4bf1c0d01..1569f1b2a 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -314,6 +314,9 @@ type StorageMinerStruct struct { SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"` SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` + SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` + SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` + SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm @@ -1310,6 +1313,18 @@ func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.Sector return c.Internal.SectorRemove(ctx, number) } +func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.SectorNumber) error { + return c.Internal.SectorTerminate(ctx, number) +} + +func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) { + return c.Internal.SectorTerminateFlush(ctx) +} + +func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return c.Internal.SectorTerminatePending(ctx) +} + func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error { return c.Internal.SectorMarkForUpgrade(ctx, number) } diff --git a/api/test/window_post.go b/api/test/window_post.go index ff107ae8d..84d668c76 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/extern/sector-storage/mock" @@ -211,6 +212,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector } } + func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int, upgradeHeight abi.ChainEpoch) { ctx, cancel := context.WithCancel(context.Background()) @@ -428,3 +430,175 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, sectors = p.MinerPower.RawBytePower.Uint64() / uint64(ssz) require.Equal(t, nSectors+GenesisPreseals-2+1, int(sectors)) // -2 not recovered sectors + 1 just pledged } + +func TestTerminate(t *testing.T, b APIBuilder, blocktime time.Duration) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nSectors := uint64(2) + + n, sn := b(t, []FullNodeOpts{FullNodeWithActorsV2At(1)}, []StorageMiner{{Full: 0, Preseal: int(nSectors)}}) + + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + + addrinfo, err := client.NetAddrsListen(ctx) + if err != nil { + t.Fatal(err) + } + + if err := miner.NetConnect(ctx, addrinfo); err != nil { + t.Fatal(err) + } + build.Clock.Sleep(time.Second) + + done := make(chan struct{}) + go func() { + defer close(done) + for ctx.Err() == nil { + build.Clock.Sleep(blocktime) + if err := sn[0].MineOne(ctx, MineNext); err != nil { + if ctx.Err() != nil { + // context was canceled, ignore the error. + return + } + t.Error(err) + } + } + }() + defer func() { + cancel() + <-done + }() + + maddr, err := miner.ActorAddress(ctx) + require.NoError(t, err) + + ssz, err := miner.ActorSectorSize(ctx, maddr) + require.NoError(t, err) + + p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors)) + + fmt.Printf("Seal a sector\n") + + pledgeSectors(t, ctx, miner, 1, 0, nil) + + fmt.Printf("wait for power\n") + + { + // Wait until proven. + di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + + waitUntil := di.PeriodStart + di.WPoStProvingPeriod + 2 + fmt.Printf("End for head.Height > %d\n", waitUntil) + + for { + head, err := client.ChainHead(ctx) + require.NoError(t, err) + + if head.Height() > waitUntil { + fmt.Printf("Now head.Height = %d\n", head.Height()) + break + } + } + } + + nSectors++ + + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors)) + + fmt.Println("Terminate a sector") + + toTerminate := abi.SectorNumber(3) + + err = miner.SectorTerminate(ctx, toTerminate) + require.NoError(t, err) + + msgTriggerred := false +loop: + for { + si, err := miner.SectorsStatus(ctx, toTerminate, false) + require.NoError(t, err) + + fmt.Println("state: ", si.State, msgTriggerred) + + switch sealing.SectorState(si.State) { + case sealing.Terminating: + if !msgTriggerred { + { + p, err := miner.SectorTerminatePending(ctx) + require.NoError(t, err) + require.Len(t, p, 1) + require.Equal(t, abi.SectorNumber(3), p[0].Number) + } + + c, err := miner.SectorTerminateFlush(ctx) + require.NoError(t, err) + if c != nil { + msgTriggerred = true + fmt.Println("terminate message:", c) + + { + p, err := miner.SectorTerminatePending(ctx) + require.NoError(t, err) + require.Len(t, p, 0) + } + } + } + case sealing.TerminateWait, sealing.TerminateFinality, sealing.Removed: + break loop + } + + time.Sleep(100 * time.Millisecond) + } + + // check power decreased + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1))) + + // check in terminated set + { + parts, err := client.StateMinerPartitions(ctx, maddr, 1, types.EmptyTSK) + require.NoError(t, err) + require.Greater(t, len(parts), 0) + + bflen := func(b bitfield.BitField) uint64 { + l, err := b.Count() + require.NoError(t, err) + return l + } + + require.Equal(t, uint64(1), bflen(parts[0].AllSectors)) + require.Equal(t, uint64(0), bflen(parts[0].LiveSectors)) + } + + di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + for { + head, err := client.ChainHead(ctx) + require.NoError(t, err) + + if head.Height() > di.PeriodStart+di.WPoStProvingPeriod+2 { + fmt.Printf("Now head.Height = %d\n", head.Height()) + break + } + build.Clock.Sleep(blocktime) + } + require.NoError(t, err) + fmt.Printf("End for head.Height > %d\n", di.PeriodStart+di.WPoStProvingPeriod+2) + + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1))) +} diff --git a/build/version.go b/build/version.go index 140456581..fd13100a5 100644 --- a/build/version.go +++ b/build/version.go @@ -84,7 +84,7 @@ func VersionForType(nodeType NodeType) (Version, error) { // semver versions of the rpc api exposed var ( FullAPIVersion = newVer(1, 0, 0) - MinerAPIVersion = newVer(1, 0, 0) + MinerAPIVersion = newVer(1, 0, 1) WorkerAPIVersion = newVer(1, 0, 0) ) diff --git a/chain/actors/builtin/miner/miner.go b/chain/actors/builtin/miner/miner.go index 5821d092b..066dc9bfd 100644 --- a/chain/actors/builtin/miner/miner.go +++ b/chain/actors/builtin/miner/miner.go @@ -20,6 +20,7 @@ import ( builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" ) func init() { @@ -42,6 +43,10 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff const MinSectorExpiration = miner0.MinSectorExpiration +// Not used / checked in v0 +var DeclarationsMax = miner2.DeclarationsMax +var AddressedSectorsMax = miner2.AddressedSectorsMax + func Load(store adt.Store, act *types.Actor) (st State, err error) { switch act.Code { case builtin0.StorageMinerActorCodeID: diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index ed74da96b..8c6297339 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -298,6 +298,10 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.FinalizeSector}, + {col: color.FgCyan, state: sealing.Terminating}, + {col: color.FgCyan, state: sealing.TerminateWait}, + {col: color.FgCyan, state: sealing.TerminateFinality}, + {col: color.FgCyan, state: sealing.TerminateFailed}, {col: color.FgCyan, state: sealing.Removing}, {col: color.FgCyan, state: sealing.Removed}, diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 1c3e4858c..5ef067b2c 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -35,6 +35,7 @@ var sectorsCmd = &cli.Command{ sectorsRefsCmd, sectorsUpdateCmd, sectorsPledgeCmd, + sectorsTerminateCmd, sectorsRemoveCmd, sectorsMarkForUpgradeCmd, sectorsStartSealCmd, @@ -396,9 +397,123 @@ var sectorsRefsCmd = &cli.Command{ }, } +var sectorsTerminateCmd = &cli.Command{ + Name: "terminate", + Usage: "Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "pass this flag if you know what you are doing", + }, + }, + Subcommands: []*cli.Command{ + sectorsTerminateFlushCmd, + sectorsTerminatePendingCmd, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("pass --really-do-it to confirm this action") + } + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + if cctx.Args().Len() != 1 { + return xerrors.Errorf("must pass sector number") + } + + id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) + if err != nil { + return xerrors.Errorf("could not parse sector number: %w", err) + } + + return nodeApi.SectorTerminate(ctx, abi.SectorNumber(id)) + }, +} + +var sectorsTerminateFlushCmd = &cli.Command{ + Name: "flush", + Usage: "Send a terminate message if there are sectors queued for termination", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + + mcid, err := nodeApi.SectorTerminateFlush(ctx) + if err != nil { + return err + } + + if mcid == nil { + return xerrors.New("no sectors were queued for termination") + } + + fmt.Println(mcid) + + return nil + }, +} + +var sectorsTerminatePendingCmd = &cli.Command{ + Name: "pending", + Usage: "List sector numbers of sectors pending termination", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + api, nCloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer nCloser() + ctx := lcli.ReqContext(cctx) + + pending, err := nodeApi.SectorTerminatePending(ctx) + if err != nil { + return err + } + + maddr, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + + dl, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting proving deadline info failed: %w", err) + } + + for _, id := range pending { + loc, err := api.StateSectorPartition(ctx, maddr, id.Number, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("finding sector partition: %w", err) + } + + fmt.Print(id.Number) + + if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) + loc.Deadline == dl.Index || // not in current + (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous + fmt.Print(" (in proving window)") + } + fmt.Println() + } + + return nil + }, +} + var sectorsRemoveCmd = &cli.Command{ Name: "remove", - Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)", + Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))", ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ diff --git a/documentation/en/api-methods-miner.md b/documentation/en/api-methods-miner.md index 0a6f8ec27..66512a02c 100644 --- a/documentation/en/api-methods-miner.md +++ b/documentation/en/api-methods-miner.md @@ -99,6 +99,9 @@ * [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration) * [SectorSetSealDelay](#SectorSetSealDelay) * [SectorStartSealing](#SectorStartSealing) + * [SectorTerminate](#SectorTerminate) + * [SectorTerminateFlush](#SectorTerminateFlush) + * [SectorTerminatePending](#SectorTerminatePending) * [Sectors](#Sectors) * [SectorsList](#SectorsList) * [SectorsListInStates](#SectorsListInStates) @@ -193,7 +196,8 @@ Response: ```json { "PreCommitControl": null, - "CommitControl": null + "CommitControl": null, + "TerminateControl": null } ``` @@ -1475,7 +1479,9 @@ Inputs: Response: `{}` ### SectorRemove -There are not yet any comments for this method. +SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can +be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. + Perms: admin @@ -1535,6 +1541,43 @@ Inputs: Response: `{}` +### SectorTerminate +SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then +automatically removes it from storage + + +Perms: admin + +Inputs: +```json +[ + 9 +] +``` + +Response: `{}` + +### SectorTerminateFlush +SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. +Returns null if message wasn't sent + + +Perms: admin + +Inputs: `null` + +Response: `null` + +### SectorTerminatePending +SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message + + +Perms: admin + +Inputs: `null` + +Response: `null` + ## Sectors diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 78765d7b4..70be08ace 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{183}); err != nil { + if _, err := w.Write([]byte{184, 25}); err != nil { return err } @@ -928,6 +928,50 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } + // t.TerminateMessage (cid.Cid) (struct) + if len("TerminateMessage") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TerminateMessage\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminateMessage"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("TerminateMessage")); err != nil { + return err + } + + if t.TerminateMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.TerminateMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.TerminateMessage: %w", err) + } + } + + // t.TerminatedAt (abi.ChainEpoch) (int64) + if len("TerminatedAt") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TerminatedAt\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminatedAt"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("TerminatedAt")); err != nil { + return err + } + + if t.TerminatedAt >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.TerminatedAt)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.TerminatedAt-1)); err != nil { + return err + } + } + // t.LastErr (string) (string) if len("LastErr") > cbg.MaxLength { return xerrors.Errorf("Value in field \"LastErr\" was too long") @@ -1441,6 +1485,55 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Return = ReturnState(sval) } + // t.TerminateMessage (cid.Cid) (struct) + case "TerminateMessage": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.TerminateMessage: %w", err) + } + + t.TerminateMessage = &c + } + + } + // t.TerminatedAt (abi.ChainEpoch) (int64) + case "TerminatedAt": + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.TerminatedAt = abi.ChainEpoch(extraI) + } // t.LastErr (string) (string) case "LastErr": diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index ea4982d2c..c989d0296 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -148,6 +148,21 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorFaultReported{}, FaultReported), on(SectorFaulty{}, Faulty), ), + Terminating: planOne( + on(SectorTerminating{}, TerminateWait), + on(SectorTerminateFailed{}, TerminateFailed), + ), + TerminateWait: planOne( + on(SectorTerminated{}, TerminateFinality), + on(SectorTerminateFailed{}, TerminateFailed), + ), + TerminateFinality: planOne( + on(SectorTerminateFailed{}, TerminateFailed), + // SectorRemove (global) + ), + TerminateFailed: planOne( + // SectorTerminating (global) + ), Removing: planOne( on(SectorRemoved{}, Removed), on(SectorRemoveFailed{}, RemoveFailed), @@ -328,6 +343,14 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // Post-seal case Proving: return m.handleProvingSector, processed, nil + case Terminating: + return m.handleTerminating, processed, nil + case TerminateWait: + return m.handleTerminateWait, processed, nil + case TerminateFinality: + return m.handleTerminateFinality, processed, nil + case TerminateFailed: + return m.handleTerminateFailed, processed, nil case Removing: return m.handleRemoving, processed, nil case Removed: diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 59f5e77e6..e28366721 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -314,6 +314,32 @@ func (evt SectorFaultReported) apply(state *SectorInfo) { type SectorFaultedFinal struct{} +// Terminating + +type SectorTerminate struct{} + +func (evt SectorTerminate) applyGlobal(state *SectorInfo) bool { + state.State = Terminating + return true +} + +type SectorTerminating struct{ Message *cid.Cid } + +func (evt SectorTerminating) apply(state *SectorInfo) { + state.TerminateMessage = evt.Message +} + +type SectorTerminated struct{ TerminatedAt abi.ChainEpoch } + +func (evt SectorTerminated) apply(state *SectorInfo) { + state.TerminatedAt = evt.TerminatedAt +} + +type SectorTerminateFailed struct{ error } + +func (evt SectorTerminateFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorTerminateFailed) apply(*SectorInfo) {} + // External events type SectorRemove struct{} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 92beb8ddf..96d63efdc 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/network" statemachine "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-storage/storage" @@ -60,6 +61,8 @@ type SealingAPI interface { StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) + StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) + StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) @@ -94,12 +97,15 @@ type Sealing struct { stats SectorStats + terminator *TerminateBatcher + getConfig GetSealingConfigFunc } type FeeConfig struct { MaxPreCommitGasFee abi.TokenAmount MaxCommitGasFee abi.TokenAmount + MaxTerminateGasFee abi.TokenAmount } type UnsealedSectorMap struct { @@ -136,6 +142,8 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds notifee: notifee, addrSel: as, + terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc), + getConfig: gc, stats: SectorStats{ @@ -160,7 +168,14 @@ func (m *Sealing) Run(ctx context.Context) error { } func (m *Sealing) Stop(ctx context.Context) error { - return m.sectors.Stop(ctx) + if err := m.terminator.Stop(ctx); err != nil { + return err + } + + if err := m.sectors.Stop(ctx); err != nil { + return err + } + return nil } func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { @@ -265,6 +280,18 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } +func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorTerminate{}) +} + +func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) { + return m.terminator.Flush(ctx) +} + +func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.terminator.Pending(ctx) +} + // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index ed32a110b..49a607958 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -30,6 +30,10 @@ var ExistSectorStateList = map[SectorState]struct{}{ Faulty: {}, FaultReported: {}, FaultedFinal: {}, + Terminating: {}, + TerminateWait: {}, + TerminateFinality: {}, + TerminateFailed: {}, Removing: {}, RemoveFailed: {}, Removed: {}, @@ -69,6 +73,11 @@ const ( FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain FaultedFinal SectorState = "FaultedFinal" // fault declared on chain + Terminating SectorState = "Terminating" + TerminateWait SectorState = "TerminateWait" + TerminateFinality SectorState = "TerminateFinality" + TerminateFailed SectorState = "TerminateFailed" + Removing SectorState = "Removing" RemoveFailed SectorState = "RemoveFailed" Removed SectorState = "Removed" @@ -78,7 +87,7 @@ func toStatState(st SectorState) statSectorState { switch st { case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing - case Proving, Removed, Removing: + case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving } diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index e425606de..4be654721 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -309,6 +309,22 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } +func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { + // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to + // the Terminating state after cooldown. If the API is still failing, well get back to here + // with the error in SectorInfo log. + pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if pci != nil { + return nil // pause the fsm, needs manual user action + } + + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorTerminate{}) +} + func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { // First make vary sure the sector isn't committed si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index de7e6c8d0..212fd906f 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -1,9 +1,14 @@ package sealing import ( + "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine" + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/policy" ) func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { @@ -31,6 +36,89 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorFaultedFinal{}) } +func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) error { + // First step of sector termination + // * See if sector is live + // * If not, goto removing + // * Add to termination queue + // * Wait for message to land on-chain + // * Check for correct termination + // * wait for expiration (+winning lookback?) + + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)}) + } + + if si == nil { + // either already terminated or not committed yet + + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)}) + } + if pci != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")}) + } + + return ctx.Send(SectorRemove{}) + } + + termCid, terminated, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber)) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)}) + } + + if terminated { + return ctx.Send(SectorTerminating{Message: nil}) + } + + return ctx.Send(SectorTerminating{Message: &termCid}) +} + +func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error { + if sector.TerminateMessage == nil { + return xerrors.New("entered TerminateWait with nil TerminateMessage") + } + + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)}) + } + + if mw.Receipt.ExitCode != exitcode.Ok { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("terminate message failed to execute: exit %d: %w", mw.Receipt.ExitCode, err)}) + } + + return ctx.Send(SectorTerminated{TerminatedAt: mw.Height}) +} + +func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error { + for { + tok, epoch, err := m.api.ChainHead(ctx.Context()) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)}) + } + + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)}) + } + + if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) { + return ctx.Send(SectorRemove{}) + } + + toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second + select { + case <-time.After(toWait): + continue + case <-ctx.Context().Done(): + return ctx.Context().Err() + } + } +} + func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error { if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { return ctx.Send(SectorRemoveFailed{err}) diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go new file mode 100644 index 000000000..31ccef93c --- /dev/null +++ b/extern/storage-sealing/terminate_batch.go @@ -0,0 +1,351 @@ +package sealing + +import ( + "bytes" + "context" + "sort" + "sync" + "time" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/dline" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" +) + +var ( + // TODO: config + + TerminateBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k + TerminateBatchMin uint64 = 1 + TerminateBatchWait = 5 * time.Minute +) + +type TerminateBatcherApi interface { + StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) + SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) + StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) + StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) + StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error) +} + +type TerminateBatcher struct { + api TerminateBatcherApi + maddr address.Address + mctx context.Context + addrSel AddrSel + feeCfg FeeConfig + + todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField + + waiting map[abi.SectorNumber][]chan cid.Cid + + notify, stop, stopped chan struct{} + force chan chan *cid.Cid + lk sync.Mutex +} + +func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher { + b := &TerminateBatcher{ + api: api, + maddr: maddr, + mctx: mctx, + addrSel: addrSel, + feeCfg: feeCfg, + + todo: map[SectorLocation]*bitfield.BitField{}, + waiting: map[abi.SectorNumber][]chan cid.Cid{}, + + notify: make(chan struct{}, 1), + force: make(chan chan *cid.Cid), + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + + go b.run() + + return b +} + +func (b *TerminateBatcher) run() { + var forceRes chan *cid.Cid + var lastMsg *cid.Cid + + for { + if forceRes != nil { + forceRes <- lastMsg + forceRes = nil + } + lastMsg = nil + + var sendAboveMax, sendAboveMin bool + select { + case <-b.stop: + close(b.stopped) + return + case <-b.notify: + sendAboveMax = true + case <-time.After(TerminateBatchWait): + sendAboveMin = true + case fr := <-b.force: // user triggered + forceRes = fr + } + + var err error + lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) + if err != nil { + log.Warnw("TerminateBatcher processBatch error", "error", err) + } + } +} + +func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { + dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("getting proving deadline info failed: %w", err) + } + + b.lk.Lock() + defer b.lk.Unlock() + params := miner2.TerminateSectorsParams{} + + var total uint64 + for loc, sectors := range b.todo { + n, err := sectors.Count() + if err != nil { + log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + // don't send terminations for currently challenged sectors + if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) + loc.Deadline == dl.Index || // not in current + (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous + continue + } + + if n < 1 { + log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) + continue + } + + toTerminate, err := sectors.Copy() + if err != nil { + log.Warnw("TerminateBatcher: copy sectors bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + if total+n > uint64(miner.AddressedSectorsMax) { + n = uint64(miner.AddressedSectorsMax) - total + + toTerminate, err = toTerminate.Slice(0, n) + if err != nil { + log.Warnw("TerminateBatcher: slice toTerminate bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + s, err := bitfield.SubtractBitField(*sectors, toTerminate) + if err != nil { + log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + *sectors = s + } + + total += n + + params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ + Deadline: loc.Deadline, + Partition: loc.Partition, + Sectors: toTerminate, + }) + + if total >= uint64(miner.AddressedSectorsMax) { + break + } + + if len(params.Terminations) >= miner.DeclarationsMax { + break + } + } + + if len(params.Terminations) == 0 { + return nil, nil // nothing to do + } + + if notif && total < TerminateBatchMax { + return nil, nil + } + + if after && total < TerminateBatchMin { + return nil, nil + } + + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err) + } + + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("couldn't get miner info: %w", err) + } + + from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) + if err != nil { + return nil, xerrors.Errorf("no good address found: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) + if err != nil { + return nil, xerrors.Errorf("sending message failed: %w", err) + } + log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) + + for _, t := range params.Terminations { + delete(b.todo, SectorLocation{ + Deadline: t.Deadline, + Partition: t.Partition, + }) + + err := t.Sectors.ForEach(func(sn uint64) error { + for _, ch := range b.waiting[abi.SectorNumber(sn)] { + ch <- mcid // buffered + } + delete(b.waiting, abi.SectorNumber(sn)) + + return nil + }) + if err != nil { + return nil, xerrors.Errorf("sectors foreach: %w", err) + } + } + + return &mcid, nil +} + +// register termination, wait for batch message, return message CID +// can return cid.Undef,true if the sector is already terminated on-chain +func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (mcid cid.Cid, terminated bool, err error) { + maddr, err := address.NewIDAddress(uint64(s.Miner)) + if err != nil { + return cid.Undef, false, err + } + + loc, err := b.api.StateSectorPartition(ctx, maddr, s.Number, nil) + if err != nil { + return cid.Undef, false, xerrors.Errorf("getting sector location: %w", err) + } + if loc == nil { + return cid.Undef, false, xerrors.New("sector location not found") + } + + { + // check if maybe already terminated + parts, err := b.api.StateMinerPartitions(ctx, maddr, loc.Deadline, nil) + if err != nil { + return cid.Cid{}, false, xerrors.Errorf("getting partitions: %w", err) + } + live, err := parts[loc.Partition].LiveSectors.IsSet(uint64(s.Number)) + if err != nil { + return cid.Cid{}, false, xerrors.Errorf("checking if sector is in live set: %w", err) + } + if !live { + // already terminated + return cid.Undef, true, nil + } + } + + b.lk.Lock() + bf, ok := b.todo[*loc] + if !ok { + n := bitfield.New() + bf = &n + b.todo[*loc] = bf + } + bf.Set(uint64(s.Number)) + + sent := make(chan cid.Cid, 1) + b.waiting[s.Number] = append(b.waiting[s.Number], sent) + + select { + case b.notify <- struct{}{}: + default: // already have a pending notification, don't need more + } + b.lk.Unlock() + + select { + case c := <-sent: + return c, false, nil + case <-ctx.Done(): + return cid.Undef, false, ctx.Err() + } +} + +func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) { + resCh := make(chan *cid.Cid, 1) + select { + case b.force <- resCh: + select { + case res := <-resCh: + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + case <-ctx.Done(): + return nil, ctx.Err() + } +} + +func (b *TerminateBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) { + b.lk.Lock() + defer b.lk.Unlock() + + mid, err := address.IDFromAddress(b.maddr) + if err != nil { + return nil, err + } + + res := make([]abi.SectorID, 0) + for _, bf := range b.todo { + err := bf.ForEach(func(id uint64) error { + res = append(res, abi.SectorID{ + Miner: abi.ActorID(mid), + Number: abi.SectorNumber(id), + }) + return nil + }) + if err != nil { + return nil, err + } + } + + sort.Slice(res, func(i, j int) bool { + if res[i].Miner != res[j].Miner { + return res[i].Miner < res[j].Miner + } + + return res[i].Number < res[j].Number + }) + + return res, nil +} + +func (b *TerminateBatcher) Stop(ctx context.Context) error { + close(b.stop) + + select { + case <-b.stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 8f3e82a0b..1d5073622 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -103,6 +103,10 @@ type SectorInfo struct { // Recovery Return ReturnState + // Termination + TerminateMessage *cid.Cid + TerminatedAt abi.ChainEpoch + // Debug LastErr string diff --git a/node/config/def.go b/node/config/def.go index 68371c384..a20e0ceaa 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -69,6 +69,7 @@ type SealingConfig struct { type MinerFeeConfig struct { MaxPreCommitGasFee types.FIL MaxCommitGasFee types.FIL + MaxTerminateGasFee types.FIL MaxWindowPoStGasFee types.FIL MaxPublishDealsFee types.FIL MaxMarketBalanceAddFee types.FIL @@ -211,6 +212,7 @@ func DefaultStorageMiner() *StorageMiner { Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"), + MaxTerminateGasFee: types.MustParseFIL("0.5"), MaxWindowPoStGasFee: types.MustParseFIL("5"), MaxPublishDealsFee: types.MustParseFIL("0.05"), MaxMarketBalanceAddFee: types.MustParseFIL("0.007"), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 7c1328361..fe79817a5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -328,6 +328,18 @@ func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber return sm.Miner.RemoveSector(ctx, id) } +func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error { + return sm.Miner.TerminateSector(ctx, id) +} + +func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) { + return sm.Miner.TerminateFlush(ctx) +} + +func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return sm.Miner.TerminatePending(ctx) +} + func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { return sm.Miner.MarkForUpgrade(id) } diff --git a/node/node_test.go b/node/node_test.go index 0baa047da..ecc0914ae 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -164,6 +164,20 @@ func TestWindowedPost(t *testing.T) { test.TestWindowPost(t, builder.MockSbBuilder, 2*time.Millisecond, 10) } +func TestTerminate(t *testing.T) { + if os.Getenv("LOTUS_TEST_WINDOW_POST") != "1" { + t.Skip("this takes a few minutes, set LOTUS_TEST_WINDOW_POST=1 to run") + } + + logging.SetLogLevel("miner", "ERROR") + logging.SetLogLevel("chainstore", "ERROR") + logging.SetLogLevel("chain", "ERROR") + logging.SetLogLevel("sub", "ERROR") + logging.SetLogLevel("storageminer", "ERROR") + + test.TestTerminate(t, builder.MockSbBuilder, 2*time.Millisecond) +} + func TestCCUpgrade(t *testing.T) { logging.SetLogLevel("miner", "ERROR") logging.SetLogLevel("chainstore", "ERROR") diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 071ad30df..20bf30825 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -4,8 +4,6 @@ import ( "bytes" "context" - "github.com/filecoin-project/go-state-types/network" - "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -14,6 +12,8 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/go-state-types/network" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" @@ -243,6 +243,15 @@ func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr addre return nil, nil // not found } +func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tok sealing.TipSetToken) ([]api.Partition, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk) +} + func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { @@ -266,6 +275,15 @@ func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok sealing. return s.delegate.StateNetworkVersion(ctx, tsk) } +func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (*dline.Info, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, err + } + + return s.delegate.StateMinerProvingDeadline(ctx, maddr, tsk) +} + func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) { msg := types.Message{ To: to, diff --git a/storage/addresses.go b/storage/addresses.go index 5da8643cd..f406394b4 100644 --- a/storage/addresses.go +++ b/storage/addresses.go @@ -30,6 +30,8 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi m addrs = append(addrs, as.PreCommitControl...) case api.CommitAddr: addrs = append(addrs, as.CommitControl...) + case api.TerminateSectorsAddr: + addrs = append(addrs, as.TerminateControl...) default: defaultCtl := map[address.Address]struct{}{} for _, a := range mi.ControlAddresses { diff --git a/storage/miner.go b/storage/miner.go index a0d5a6a92..752d7ff42 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -148,6 +148,7 @@ func (m *Miner) Run(ctx context.Context) error { fc := sealing.FeeConfig{ MaxPreCommitGasFee: abi.TokenAmount(m.feeCfg.MaxPreCommitGasFee), MaxCommitGasFee: abi.TokenAmount(m.feeCfg.MaxCommitGasFee), + MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee), } evts := events.NewEvents(ctx, m.api) diff --git a/storage/sealing.go b/storage/sealing.go index 2cd454e5b..d07a14810 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -4,6 +4,8 @@ import ( "context" "io" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -44,6 +46,18 @@ func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error { return m.sealing.Remove(ctx, id) } +func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error { + return m.sealing.Terminate(ctx, id) +} + +func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) { + return m.sealing.TerminateFlush(ctx) +} + +func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.sealing.TerminatePending(ctx) +} + func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { return m.sealing.MarkForUpgrade(id) }