Merge pull request #5341 from filecoin-project/feat/sectors-terminate
Sector termination support
This commit is contained in:
commit
476df99179
@ -200,6 +200,8 @@ jobs:
|
||||
<<: *test
|
||||
test-window-post:
|
||||
<<: *test
|
||||
test-terminate:
|
||||
<<: *test
|
||||
test-conformance:
|
||||
description: |
|
||||
Run tests using a corpus of interoperable test vectors for Filecoin
|
||||
@ -476,9 +478,15 @@ workflows:
|
||||
test-suite-name: cli
|
||||
packages: "./cli/... ./cmd/... ./api/..."
|
||||
- test-window-post:
|
||||
codecov-upload: true
|
||||
go-test-flags: "-run=TestWindowedPost"
|
||||
winpost-test: "1"
|
||||
test-suite-name: window-post
|
||||
- test-terminate:
|
||||
codecov-upload: true
|
||||
go-test-flags: "-run=TestTerminate"
|
||||
winpost-test: "1"
|
||||
test-suite-name: terminate
|
||||
- test-short:
|
||||
go-test-flags: "--timeout 10m --short"
|
||||
test-suite-name: short
|
||||
|
@ -65,7 +65,17 @@ type StorageMiner interface {
|
||||
// SectorGetExpectedSealDuration gets the expected time for a sector to seal
|
||||
SectorGetExpectedSealDuration(context.Context) (time.Duration, error)
|
||||
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
|
||||
// SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
|
||||
// be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.
|
||||
SectorRemove(context.Context, abi.SectorNumber) error
|
||||
// SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then
|
||||
// automatically removes it from storage
|
||||
SectorTerminate(context.Context, abi.SectorNumber) error
|
||||
// SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
|
||||
// Returns null if message wasn't sent
|
||||
SectorTerminateFlush(ctx context.Context) (*cid.Cid, error)
|
||||
// SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
|
||||
SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error)
|
||||
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
|
||||
|
||||
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
|
||||
@ -217,9 +227,12 @@ const (
|
||||
PreCommitAddr AddrUse = iota
|
||||
CommitAddr
|
||||
PoStAddr
|
||||
|
||||
TerminateSectorsAddr
|
||||
)
|
||||
|
||||
type AddressConfig struct {
|
||||
PreCommitControl []address.Address
|
||||
CommitControl []address.Address
|
||||
TerminateControl []address.Address
|
||||
}
|
||||
|
@ -314,6 +314,9 @@ type StorageMinerStruct struct {
|
||||
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
|
||||
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
|
||||
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
|
||||
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
|
||||
SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"`
|
||||
SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"`
|
||||
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
|
||||
|
||||
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
|
||||
@ -1310,6 +1313,18 @@ func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.Sector
|
||||
return c.Internal.SectorRemove(ctx, number)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.SectorNumber) error {
|
||||
return c.Internal.SectorTerminate(ctx, number)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
return c.Internal.SectorTerminateFlush(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return c.Internal.SectorTerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error {
|
||||
return c.Internal.SectorMarkForUpgrade(ctx, number)
|
||||
}
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
"github.com/stretchr/testify/require"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
|
||||
@ -211,6 +212,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int,
|
||||
upgradeHeight abi.ChainEpoch) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
@ -428,3 +430,175 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration,
|
||||
sectors = p.MinerPower.RawBytePower.Uint64() / uint64(ssz)
|
||||
require.Equal(t, nSectors+GenesisPreseals-2+1, int(sectors)) // -2 not recovered sectors + 1 just pledged
|
||||
}
|
||||
|
||||
func TestTerminate(t *testing.T, b APIBuilder, blocktime time.Duration) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
|
||||
nSectors := uint64(2)
|
||||
|
||||
n, sn := b(t, []FullNodeOpts{FullNodeWithActorsV2At(1)}, []StorageMiner{{Full: 0, Preseal: int(nSectors)}})
|
||||
|
||||
client := n[0].FullNode.(*impl.FullNodeAPI)
|
||||
miner := sn[0]
|
||||
|
||||
addrinfo, err := client.NetAddrsListen(ctx)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
if err := miner.NetConnect(ctx, addrinfo); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
build.Clock.Sleep(time.Second)
|
||||
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
defer close(done)
|
||||
for ctx.Err() == nil {
|
||||
build.Clock.Sleep(blocktime)
|
||||
if err := sn[0].MineOne(ctx, MineNext); err != nil {
|
||||
if ctx.Err() != nil {
|
||||
// context was canceled, ignore the error.
|
||||
return
|
||||
}
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
defer func() {
|
||||
cancel()
|
||||
<-done
|
||||
}()
|
||||
|
||||
maddr, err := miner.ActorAddress(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
ssz, err := miner.ActorSectorSize(ctx, maddr)
|
||||
require.NoError(t, err)
|
||||
|
||||
p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, p.MinerPower, p.TotalPower)
|
||||
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors))
|
||||
|
||||
fmt.Printf("Seal a sector\n")
|
||||
|
||||
pledgeSectors(t, ctx, miner, 1, 0, nil)
|
||||
|
||||
fmt.Printf("wait for power\n")
|
||||
|
||||
{
|
||||
// Wait until proven.
|
||||
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
waitUntil := di.PeriodStart + di.WPoStProvingPeriod + 2
|
||||
fmt.Printf("End for head.Height > %d\n", waitUntil)
|
||||
|
||||
for {
|
||||
head, err := client.ChainHead(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
if head.Height() > waitUntil {
|
||||
fmt.Printf("Now head.Height = %d\n", head.Height())
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
nSectors++
|
||||
|
||||
p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, p.MinerPower, p.TotalPower)
|
||||
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors))
|
||||
|
||||
fmt.Println("Terminate a sector")
|
||||
|
||||
toTerminate := abi.SectorNumber(3)
|
||||
|
||||
err = miner.SectorTerminate(ctx, toTerminate)
|
||||
require.NoError(t, err)
|
||||
|
||||
msgTriggerred := false
|
||||
loop:
|
||||
for {
|
||||
si, err := miner.SectorsStatus(ctx, toTerminate, false)
|
||||
require.NoError(t, err)
|
||||
|
||||
fmt.Println("state: ", si.State, msgTriggerred)
|
||||
|
||||
switch sealing.SectorState(si.State) {
|
||||
case sealing.Terminating:
|
||||
if !msgTriggerred {
|
||||
{
|
||||
p, err := miner.SectorTerminatePending(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, p, 1)
|
||||
require.Equal(t, abi.SectorNumber(3), p[0].Number)
|
||||
}
|
||||
|
||||
c, err := miner.SectorTerminateFlush(ctx)
|
||||
require.NoError(t, err)
|
||||
if c != nil {
|
||||
msgTriggerred = true
|
||||
fmt.Println("terminate message:", c)
|
||||
|
||||
{
|
||||
p, err := miner.SectorTerminatePending(ctx)
|
||||
require.NoError(t, err)
|
||||
require.Len(t, p, 0)
|
||||
}
|
||||
}
|
||||
}
|
||||
case sealing.TerminateWait, sealing.TerminateFinality, sealing.Removed:
|
||||
break loop
|
||||
}
|
||||
|
||||
time.Sleep(100 * time.Millisecond)
|
||||
}
|
||||
|
||||
// check power decreased
|
||||
p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, p.MinerPower, p.TotalPower)
|
||||
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1)))
|
||||
|
||||
// check in terminated set
|
||||
{
|
||||
parts, err := client.StateMinerPartitions(ctx, maddr, 1, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
require.Greater(t, len(parts), 0)
|
||||
|
||||
bflen := func(b bitfield.BitField) uint64 {
|
||||
l, err := b.Count()
|
||||
require.NoError(t, err)
|
||||
return l
|
||||
}
|
||||
|
||||
require.Equal(t, uint64(1), bflen(parts[0].AllSectors))
|
||||
require.Equal(t, uint64(0), bflen(parts[0].LiveSectors))
|
||||
}
|
||||
|
||||
di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
for {
|
||||
head, err := client.ChainHead(ctx)
|
||||
require.NoError(t, err)
|
||||
|
||||
if head.Height() > di.PeriodStart+di.WPoStProvingPeriod+2 {
|
||||
fmt.Printf("Now head.Height = %d\n", head.Height())
|
||||
break
|
||||
}
|
||||
build.Clock.Sleep(blocktime)
|
||||
}
|
||||
require.NoError(t, err)
|
||||
fmt.Printf("End for head.Height > %d\n", di.PeriodStart+di.WPoStProvingPeriod+2)
|
||||
|
||||
p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK)
|
||||
require.NoError(t, err)
|
||||
|
||||
require.Equal(t, p.MinerPower, p.TotalPower)
|
||||
require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1)))
|
||||
}
|
||||
|
@ -84,7 +84,7 @@ func VersionForType(nodeType NodeType) (Version, error) {
|
||||
// semver versions of the rpc api exposed
|
||||
var (
|
||||
FullAPIVersion = newVer(1, 0, 0)
|
||||
MinerAPIVersion = newVer(1, 0, 0)
|
||||
MinerAPIVersion = newVer(1, 0, 1)
|
||||
WorkerAPIVersion = newVer(1, 0, 0)
|
||||
)
|
||||
|
||||
|
@ -20,6 +20,7 @@ import (
|
||||
builtin0 "github.com/filecoin-project/specs-actors/actors/builtin"
|
||||
miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner"
|
||||
builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -42,6 +43,10 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff
|
||||
|
||||
const MinSectorExpiration = miner0.MinSectorExpiration
|
||||
|
||||
// Not used / checked in v0
|
||||
var DeclarationsMax = miner2.DeclarationsMax
|
||||
var AddressedSectorsMax = miner2.AddressedSectorsMax
|
||||
|
||||
func Load(store adt.Store, act *types.Actor) (st State, err error) {
|
||||
switch act.Code {
|
||||
case builtin0.StorageMinerActorCodeID:
|
||||
|
@ -298,6 +298,10 @@ var stateList = []stateMeta{
|
||||
{col: color.FgYellow, state: sealing.CommitWait},
|
||||
{col: color.FgYellow, state: sealing.FinalizeSector},
|
||||
|
||||
{col: color.FgCyan, state: sealing.Terminating},
|
||||
{col: color.FgCyan, state: sealing.TerminateWait},
|
||||
{col: color.FgCyan, state: sealing.TerminateFinality},
|
||||
{col: color.FgCyan, state: sealing.TerminateFailed},
|
||||
{col: color.FgCyan, state: sealing.Removing},
|
||||
{col: color.FgCyan, state: sealing.Removed},
|
||||
|
||||
|
@ -35,6 +35,7 @@ var sectorsCmd = &cli.Command{
|
||||
sectorsRefsCmd,
|
||||
sectorsUpdateCmd,
|
||||
sectorsPledgeCmd,
|
||||
sectorsTerminateCmd,
|
||||
sectorsRemoveCmd,
|
||||
sectorsMarkForUpgradeCmd,
|
||||
sectorsStartSealCmd,
|
||||
@ -396,9 +397,123 @@ var sectorsRefsCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsTerminateCmd = &cli.Command{
|
||||
Name: "terminate",
|
||||
Usage: "Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)",
|
||||
ArgsUsage: "<sectorNum>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
Usage: "pass this flag if you know what you are doing",
|
||||
},
|
||||
},
|
||||
Subcommands: []*cli.Command{
|
||||
sectorsTerminateFlushCmd,
|
||||
sectorsTerminatePendingCmd,
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass --really-do-it to confirm this action")
|
||||
}
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
if cctx.Args().Len() != 1 {
|
||||
return xerrors.Errorf("must pass sector number")
|
||||
}
|
||||
|
||||
id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not parse sector number: %w", err)
|
||||
}
|
||||
|
||||
return nodeApi.SectorTerminate(ctx, abi.SectorNumber(id))
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsTerminateFlushCmd = &cli.Command{
|
||||
Name: "flush",
|
||||
Usage: "Send a terminate message if there are sectors queued for termination",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
mcid, err := nodeApi.SectorTerminateFlush(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if mcid == nil {
|
||||
return xerrors.New("no sectors were queued for termination")
|
||||
}
|
||||
|
||||
fmt.Println(mcid)
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsTerminatePendingCmd = &cli.Command{
|
||||
Name: "pending",
|
||||
Usage: "List sector numbers of sectors pending termination",
|
||||
Action: func(cctx *cli.Context) error {
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
api, nCloser, err := lcli.GetFullNodeAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer nCloser()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
|
||||
pending, err := nodeApi.SectorTerminatePending(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
maddr, err := nodeApi.ActorAddress(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
dl, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("getting proving deadline info failed: %w", err)
|
||||
}
|
||||
|
||||
for _, id := range pending {
|
||||
loc, err := api.StateSectorPartition(ctx, maddr, id.Number, types.EmptyTSK)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("finding sector partition: %w", err)
|
||||
}
|
||||
|
||||
fmt.Print(id.Number)
|
||||
|
||||
if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain)
|
||||
loc.Deadline == dl.Index || // not in current
|
||||
(loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous
|
||||
fmt.Print(" (in proving window)")
|
||||
}
|
||||
fmt.Println()
|
||||
}
|
||||
|
||||
return nil
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsRemoveCmd = &cli.Command{
|
||||
Name: "remove",
|
||||
Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)",
|
||||
Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))",
|
||||
ArgsUsage: "<sectorNum>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
|
@ -99,6 +99,9 @@
|
||||
* [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration)
|
||||
* [SectorSetSealDelay](#SectorSetSealDelay)
|
||||
* [SectorStartSealing](#SectorStartSealing)
|
||||
* [SectorTerminate](#SectorTerminate)
|
||||
* [SectorTerminateFlush](#SectorTerminateFlush)
|
||||
* [SectorTerminatePending](#SectorTerminatePending)
|
||||
* [Sectors](#Sectors)
|
||||
* [SectorsList](#SectorsList)
|
||||
* [SectorsListInStates](#SectorsListInStates)
|
||||
@ -193,7 +196,8 @@ Response:
|
||||
```json
|
||||
{
|
||||
"PreCommitControl": null,
|
||||
"CommitControl": null
|
||||
"CommitControl": null,
|
||||
"TerminateControl": null
|
||||
}
|
||||
```
|
||||
|
||||
@ -1475,7 +1479,9 @@ Inputs:
|
||||
Response: `{}`
|
||||
|
||||
### SectorRemove
|
||||
There are not yet any comments for this method.
|
||||
SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
|
||||
be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
@ -1535,6 +1541,43 @@ Inputs:
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SectorTerminate
|
||||
SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then
|
||||
automatically removes it from storage
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs:
|
||||
```json
|
||||
[
|
||||
9
|
||||
]
|
||||
```
|
||||
|
||||
Response: `{}`
|
||||
|
||||
### SectorTerminateFlush
|
||||
SectorTerminateFlush immediately sends a terminate message with sectors batched for termination.
|
||||
Returns null if message wasn't sent
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs: `null`
|
||||
|
||||
Response: `null`
|
||||
|
||||
### SectorTerminatePending
|
||||
SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message
|
||||
|
||||
|
||||
Perms: admin
|
||||
|
||||
Inputs: `null`
|
||||
|
||||
Response: `null`
|
||||
|
||||
## Sectors
|
||||
|
||||
|
||||
|
95
extern/storage-sealing/cbor_gen.go
vendored
95
extern/storage-sealing/cbor_gen.go
vendored
@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{183}); err != nil {
|
||||
if _, err := w.Write([]byte{184, 25}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -928,6 +928,50 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.TerminateMessage (cid.Cid) (struct)
|
||||
if len("TerminateMessage") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"TerminateMessage\" was too long")
|
||||
}
|
||||
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminateMessage"))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string("TerminateMessage")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.TerminateMessage == nil {
|
||||
if _, err := w.Write(cbg.CborNull); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteCidBuf(scratch, w, *t.TerminateMessage); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.TerminateMessage: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// t.TerminatedAt (abi.ChainEpoch) (int64)
|
||||
if len("TerminatedAt") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"TerminatedAt\" was too long")
|
||||
}
|
||||
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminatedAt"))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string("TerminatedAt")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.TerminatedAt >= 0 {
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.TerminatedAt)); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.TerminatedAt-1)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// t.LastErr (string) (string)
|
||||
if len("LastErr") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"LastErr\" was too long")
|
||||
@ -1441,6 +1485,55 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
|
||||
|
||||
t.Return = ReturnState(sval)
|
||||
}
|
||||
// t.TerminateMessage (cid.Cid) (struct)
|
||||
case "TerminateMessage":
|
||||
|
||||
{
|
||||
|
||||
b, err := br.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != cbg.CborNull[0] {
|
||||
if err := br.UnreadByte(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := cbg.ReadCid(br)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.TerminateMessage: %w", err)
|
||||
}
|
||||
|
||||
t.TerminateMessage = &c
|
||||
}
|
||||
|
||||
}
|
||||
// t.TerminatedAt (abi.ChainEpoch) (int64)
|
||||
case "TerminatedAt":
|
||||
{
|
||||
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||
var extraI int64
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch maj {
|
||||
case cbg.MajUnsignedInt:
|
||||
extraI = int64(extra)
|
||||
if extraI < 0 {
|
||||
return fmt.Errorf("int64 positive overflow")
|
||||
}
|
||||
case cbg.MajNegativeInt:
|
||||
extraI = int64(extra)
|
||||
if extraI < 0 {
|
||||
return fmt.Errorf("int64 negative oveflow")
|
||||
}
|
||||
extraI = -1 - extraI
|
||||
default:
|
||||
return fmt.Errorf("wrong type for int64 field: %d", maj)
|
||||
}
|
||||
|
||||
t.TerminatedAt = abi.ChainEpoch(extraI)
|
||||
}
|
||||
// t.LastErr (string) (string)
|
||||
case "LastErr":
|
||||
|
||||
|
23
extern/storage-sealing/fsm.go
vendored
23
extern/storage-sealing/fsm.go
vendored
@ -148,6 +148,21 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
on(SectorFaultReported{}, FaultReported),
|
||||
on(SectorFaulty{}, Faulty),
|
||||
),
|
||||
Terminating: planOne(
|
||||
on(SectorTerminating{}, TerminateWait),
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
),
|
||||
TerminateWait: planOne(
|
||||
on(SectorTerminated{}, TerminateFinality),
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
),
|
||||
TerminateFinality: planOne(
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
// SectorRemove (global)
|
||||
),
|
||||
TerminateFailed: planOne(
|
||||
// SectorTerminating (global)
|
||||
),
|
||||
Removing: planOne(
|
||||
on(SectorRemoved{}, Removed),
|
||||
on(SectorRemoveFailed{}, RemoveFailed),
|
||||
@ -328,6 +343,14 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
// Post-seal
|
||||
case Proving:
|
||||
return m.handleProvingSector, processed, nil
|
||||
case Terminating:
|
||||
return m.handleTerminating, processed, nil
|
||||
case TerminateWait:
|
||||
return m.handleTerminateWait, processed, nil
|
||||
case TerminateFinality:
|
||||
return m.handleTerminateFinality, processed, nil
|
||||
case TerminateFailed:
|
||||
return m.handleTerminateFailed, processed, nil
|
||||
case Removing:
|
||||
return m.handleRemoving, processed, nil
|
||||
case Removed:
|
||||
|
26
extern/storage-sealing/fsm_events.go
vendored
26
extern/storage-sealing/fsm_events.go
vendored
@ -314,6 +314,32 @@ func (evt SectorFaultReported) apply(state *SectorInfo) {
|
||||
|
||||
type SectorFaultedFinal struct{}
|
||||
|
||||
// Terminating
|
||||
|
||||
type SectorTerminate struct{}
|
||||
|
||||
func (evt SectorTerminate) applyGlobal(state *SectorInfo) bool {
|
||||
state.State = Terminating
|
||||
return true
|
||||
}
|
||||
|
||||
type SectorTerminating struct{ Message *cid.Cid }
|
||||
|
||||
func (evt SectorTerminating) apply(state *SectorInfo) {
|
||||
state.TerminateMessage = evt.Message
|
||||
}
|
||||
|
||||
type SectorTerminated struct{ TerminatedAt abi.ChainEpoch }
|
||||
|
||||
func (evt SectorTerminated) apply(state *SectorInfo) {
|
||||
state.TerminatedAt = evt.TerminatedAt
|
||||
}
|
||||
|
||||
type SectorTerminateFailed struct{ error }
|
||||
|
||||
func (evt SectorTerminateFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
|
||||
func (evt SectorTerminateFailed) apply(*SectorInfo) {}
|
||||
|
||||
// External events
|
||||
|
||||
type SectorRemove struct{}
|
||||
|
29
extern/storage-sealing/sealing.go
vendored
29
extern/storage-sealing/sealing.go
vendored
@ -19,6 +19,7 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
statemachine "github.com/filecoin-project/go-statemachine"
|
||||
"github.com/filecoin-project/specs-storage/storage"
|
||||
@ -60,6 +61,8 @@ type SealingAPI interface {
|
||||
StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error)
|
||||
StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error)
|
||||
StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error)
|
||||
StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error)
|
||||
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error)
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error)
|
||||
ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error)
|
||||
@ -94,12 +97,15 @@ type Sealing struct {
|
||||
|
||||
stats SectorStats
|
||||
|
||||
terminator *TerminateBatcher
|
||||
|
||||
getConfig GetSealingConfigFunc
|
||||
}
|
||||
|
||||
type FeeConfig struct {
|
||||
MaxPreCommitGasFee abi.TokenAmount
|
||||
MaxCommitGasFee abi.TokenAmount
|
||||
MaxTerminateGasFee abi.TokenAmount
|
||||
}
|
||||
|
||||
type UnsealedSectorMap struct {
|
||||
@ -136,6 +142,8 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
||||
notifee: notifee,
|
||||
addrSel: as,
|
||||
|
||||
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc),
|
||||
|
||||
getConfig: gc,
|
||||
|
||||
stats: SectorStats{
|
||||
@ -160,7 +168,14 @@ func (m *Sealing) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *Sealing) Stop(ctx context.Context) error {
|
||||
return m.sectors.Stop(ctx)
|
||||
if err := m.terminator.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.sectors.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
|
||||
@ -265,6 +280,18 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||
}
|
||||
|
||||
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorTerminate{})
|
||||
}
|
||||
|
||||
func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
return m.terminator.Flush(ctx)
|
||||
}
|
||||
|
||||
func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return m.terminator.Pending(ctx)
|
||||
}
|
||||
|
||||
// Caller should NOT hold m.unsealedInfoMap.lk
|
||||
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
||||
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else
|
||||
|
11
extern/storage-sealing/sector_state.go
vendored
11
extern/storage-sealing/sector_state.go
vendored
@ -30,6 +30,10 @@ var ExistSectorStateList = map[SectorState]struct{}{
|
||||
Faulty: {},
|
||||
FaultReported: {},
|
||||
FaultedFinal: {},
|
||||
Terminating: {},
|
||||
TerminateWait: {},
|
||||
TerminateFinality: {},
|
||||
TerminateFailed: {},
|
||||
Removing: {},
|
||||
RemoveFailed: {},
|
||||
Removed: {},
|
||||
@ -69,6 +73,11 @@ const (
|
||||
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
|
||||
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
|
||||
|
||||
Terminating SectorState = "Terminating"
|
||||
TerminateWait SectorState = "TerminateWait"
|
||||
TerminateFinality SectorState = "TerminateFinality"
|
||||
TerminateFailed SectorState = "TerminateFailed"
|
||||
|
||||
Removing SectorState = "Removing"
|
||||
RemoveFailed SectorState = "RemoveFailed"
|
||||
Removed SectorState = "Removed"
|
||||
@ -78,7 +87,7 @@ func toStatState(st SectorState) statSectorState {
|
||||
switch st {
|
||||
case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector:
|
||||
return sstSealing
|
||||
case Proving, Removed, Removing:
|
||||
case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed:
|
||||
return sstProving
|
||||
}
|
||||
|
||||
|
16
extern/storage-sealing/states_failed.go
vendored
16
extern/storage-sealing/states_failed.go
vendored
@ -309,6 +309,22 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to
|
||||
// the Terminating state after cooldown. If the API is still failing, well get back to here
|
||||
// with the error in SectorInfo log.
|
||||
pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if pci != nil {
|
||||
return nil // pause the fsm, needs manual user action
|
||||
}
|
||||
|
||||
if err := failedCooldown(ctx, sector); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminate{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// First make vary sure the sector isn't committed
|
||||
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
|
88
extern/storage-sealing/states_proving.go
vendored
88
extern/storage-sealing/states_proving.go
vendored
@ -1,9 +1,14 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/go-statemachine"
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
)
|
||||
|
||||
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -31,6 +36,89 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf
|
||||
return ctx.Send(SectorFaultedFinal{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// First step of sector termination
|
||||
// * See if sector is live
|
||||
// * If not, goto removing
|
||||
// * Add to termination queue
|
||||
// * Wait for message to land on-chain
|
||||
// * Check for correct termination
|
||||
// * wait for expiration (+winning lookback?)
|
||||
|
||||
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)})
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
// either already terminated or not committed yet
|
||||
|
||||
pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)})
|
||||
}
|
||||
if pci != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
termCid, terminated, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber))
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)})
|
||||
}
|
||||
|
||||
if terminated {
|
||||
return ctx.Send(SectorTerminating{Message: nil})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminating{Message: &termCid})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if sector.TerminateMessage == nil {
|
||||
return xerrors.New("entered TerminateWait with nil TerminateMessage")
|
||||
}
|
||||
|
||||
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)})
|
||||
}
|
||||
|
||||
if mw.Receipt.ExitCode != exitcode.Ok {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("terminate message failed to execute: exit %d: %w", mw.Receipt.ExitCode, err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminated{TerminatedAt: mw.Height})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error {
|
||||
for {
|
||||
tok, epoch, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)})
|
||||
}
|
||||
|
||||
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)})
|
||||
}
|
||||
|
||||
if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) {
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second
|
||||
select {
|
||||
case <-time.After(toWait):
|
||||
continue
|
||||
case <-ctx.Context().Done():
|
||||
return ctx.Context().Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
return ctx.Send(SectorRemoveFailed{err})
|
||||
|
351
extern/storage-sealing/terminate_batch.go
vendored
Normal file
351
extern/storage-sealing/terminate_batch.go
vendored
Normal file
@ -0,0 +1,351 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO: config
|
||||
|
||||
TerminateBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k
|
||||
TerminateBatchMin uint64 = 1
|
||||
TerminateBatchWait = 5 * time.Minute
|
||||
)
|
||||
|
||||
type TerminateBatcherApi interface {
|
||||
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||
StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error)
|
||||
StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error)
|
||||
}
|
||||
|
||||
type TerminateBatcher struct {
|
||||
api TerminateBatcherApi
|
||||
maddr address.Address
|
||||
mctx context.Context
|
||||
addrSel AddrSel
|
||||
feeCfg FeeConfig
|
||||
|
||||
todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
|
||||
|
||||
waiting map[abi.SectorNumber][]chan cid.Cid
|
||||
|
||||
notify, stop, stopped chan struct{}
|
||||
force chan chan *cid.Cid
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher {
|
||||
b := &TerminateBatcher{
|
||||
api: api,
|
||||
maddr: maddr,
|
||||
mctx: mctx,
|
||||
addrSel: addrSel,
|
||||
feeCfg: feeCfg,
|
||||
|
||||
todo: map[SectorLocation]*bitfield.BitField{},
|
||||
waiting: map[abi.SectorNumber][]chan cid.Cid{},
|
||||
|
||||
notify: make(chan struct{}, 1),
|
||||
force: make(chan chan *cid.Cid),
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
|
||||
go b.run()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) run() {
|
||||
var forceRes chan *cid.Cid
|
||||
var lastMsg *cid.Cid
|
||||
|
||||
for {
|
||||
if forceRes != nil {
|
||||
forceRes <- lastMsg
|
||||
forceRes = nil
|
||||
}
|
||||
lastMsg = nil
|
||||
|
||||
var sendAboveMax, sendAboveMin bool
|
||||
select {
|
||||
case <-b.stop:
|
||||
close(b.stopped)
|
||||
return
|
||||
case <-b.notify:
|
||||
sendAboveMax = true
|
||||
case <-time.After(TerminateBatchWait):
|
||||
sendAboveMin = true
|
||||
case fr := <-b.force: // user triggered
|
||||
forceRes = fr
|
||||
}
|
||||
|
||||
var err error
|
||||
lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin)
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher processBatch error", "error", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) {
|
||||
dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("getting proving deadline info failed: %w", err)
|
||||
}
|
||||
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
params := miner2.TerminateSectorsParams{}
|
||||
|
||||
var total uint64
|
||||
for loc, sectors := range b.todo {
|
||||
n, err := sectors.Count()
|
||||
if err != nil {
|
||||
log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// don't send terminations for currently challenged sectors
|
||||
if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain)
|
||||
loc.Deadline == dl.Index || // not in current
|
||||
(loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous
|
||||
continue
|
||||
}
|
||||
|
||||
if n < 1 {
|
||||
log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition)
|
||||
continue
|
||||
}
|
||||
|
||||
toTerminate, err := sectors.Copy()
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher: copy sectors bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if total+n > uint64(miner.AddressedSectorsMax) {
|
||||
n = uint64(miner.AddressedSectorsMax) - total
|
||||
|
||||
toTerminate, err = toTerminate.Slice(0, n)
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher: slice toTerminate bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
|
||||
continue
|
||||
}
|
||||
|
||||
s, err := bitfield.SubtractBitField(*sectors, toTerminate)
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
|
||||
continue
|
||||
}
|
||||
*sectors = s
|
||||
}
|
||||
|
||||
total += n
|
||||
|
||||
params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{
|
||||
Deadline: loc.Deadline,
|
||||
Partition: loc.Partition,
|
||||
Sectors: toTerminate,
|
||||
})
|
||||
|
||||
if total >= uint64(miner.AddressedSectorsMax) {
|
||||
break
|
||||
}
|
||||
|
||||
if len(params.Terminations) >= miner.DeclarationsMax {
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
if len(params.Terminations) == 0 {
|
||||
return nil, nil // nothing to do
|
||||
}
|
||||
|
||||
if notif && total < TerminateBatchMax {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
if after && total < TerminateBatchMin {
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
enc := new(bytes.Buffer)
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err)
|
||||
}
|
||||
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("couldn't get miner info: %w", err)
|
||||
}
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("no good address found: %w", err)
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sending message failed: %w", err)
|
||||
}
|
||||
log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations))
|
||||
|
||||
for _, t := range params.Terminations {
|
||||
delete(b.todo, SectorLocation{
|
||||
Deadline: t.Deadline,
|
||||
Partition: t.Partition,
|
||||
})
|
||||
|
||||
err := t.Sectors.ForEach(func(sn uint64) error {
|
||||
for _, ch := range b.waiting[abi.SectorNumber(sn)] {
|
||||
ch <- mcid // buffered
|
||||
}
|
||||
delete(b.waiting, abi.SectorNumber(sn))
|
||||
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("sectors foreach: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
return &mcid, nil
|
||||
}
|
||||
|
||||
// register termination, wait for batch message, return message CID
|
||||
// can return cid.Undef,true if the sector is already terminated on-chain
|
||||
func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (mcid cid.Cid, terminated bool, err error) {
|
||||
maddr, err := address.NewIDAddress(uint64(s.Miner))
|
||||
if err != nil {
|
||||
return cid.Undef, false, err
|
||||
}
|
||||
|
||||
loc, err := b.api.StateSectorPartition(ctx, maddr, s.Number, nil)
|
||||
if err != nil {
|
||||
return cid.Undef, false, xerrors.Errorf("getting sector location: %w", err)
|
||||
}
|
||||
if loc == nil {
|
||||
return cid.Undef, false, xerrors.New("sector location not found")
|
||||
}
|
||||
|
||||
{
|
||||
// check if maybe already terminated
|
||||
parts, err := b.api.StateMinerPartitions(ctx, maddr, loc.Deadline, nil)
|
||||
if err != nil {
|
||||
return cid.Cid{}, false, xerrors.Errorf("getting partitions: %w", err)
|
||||
}
|
||||
live, err := parts[loc.Partition].LiveSectors.IsSet(uint64(s.Number))
|
||||
if err != nil {
|
||||
return cid.Cid{}, false, xerrors.Errorf("checking if sector is in live set: %w", err)
|
||||
}
|
||||
if !live {
|
||||
// already terminated
|
||||
return cid.Undef, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
b.lk.Lock()
|
||||
bf, ok := b.todo[*loc]
|
||||
if !ok {
|
||||
n := bitfield.New()
|
||||
bf = &n
|
||||
b.todo[*loc] = bf
|
||||
}
|
||||
bf.Set(uint64(s.Number))
|
||||
|
||||
sent := make(chan cid.Cid, 1)
|
||||
b.waiting[s.Number] = append(b.waiting[s.Number], sent)
|
||||
|
||||
select {
|
||||
case b.notify <- struct{}{}:
|
||||
default: // already have a pending notification, don't need more
|
||||
}
|
||||
b.lk.Unlock()
|
||||
|
||||
select {
|
||||
case c := <-sent:
|
||||
return c, false, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, false, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) {
|
||||
resCh := make(chan *cid.Cid, 1)
|
||||
select {
|
||||
case b.force <- resCh:
|
||||
select {
|
||||
case res := <-resCh:
|
||||
return res, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
b.lk.Lock()
|
||||
defer b.lk.Unlock()
|
||||
|
||||
mid, err := address.IDFromAddress(b.maddr)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
res := make([]abi.SectorID, 0)
|
||||
for _, bf := range b.todo {
|
||||
err := bf.ForEach(func(id uint64) error {
|
||||
res = append(res, abi.SectorID{
|
||||
Miner: abi.ActorID(mid),
|
||||
Number: abi.SectorNumber(id),
|
||||
})
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
sort.Slice(res, func(i, j int) bool {
|
||||
if res[i].Miner != res[j].Miner {
|
||||
return res[i].Miner < res[j].Miner
|
||||
}
|
||||
|
||||
return res[i].Number < res[j].Number
|
||||
})
|
||||
|
||||
return res, nil
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) Stop(ctx context.Context) error {
|
||||
close(b.stop)
|
||||
|
||||
select {
|
||||
case <-b.stopped:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
4
extern/storage-sealing/types.go
vendored
4
extern/storage-sealing/types.go
vendored
@ -103,6 +103,10 @@ type SectorInfo struct {
|
||||
// Recovery
|
||||
Return ReturnState
|
||||
|
||||
// Termination
|
||||
TerminateMessage *cid.Cid
|
||||
TerminatedAt abi.ChainEpoch
|
||||
|
||||
// Debug
|
||||
LastErr string
|
||||
|
||||
|
@ -69,6 +69,7 @@ type SealingConfig struct {
|
||||
type MinerFeeConfig struct {
|
||||
MaxPreCommitGasFee types.FIL
|
||||
MaxCommitGasFee types.FIL
|
||||
MaxTerminateGasFee types.FIL
|
||||
MaxWindowPoStGasFee types.FIL
|
||||
MaxPublishDealsFee types.FIL
|
||||
MaxMarketBalanceAddFee types.FIL
|
||||
@ -211,6 +212,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
Fees: MinerFeeConfig{
|
||||
MaxPreCommitGasFee: types.MustParseFIL("0.025"),
|
||||
MaxCommitGasFee: types.MustParseFIL("0.05"),
|
||||
MaxTerminateGasFee: types.MustParseFIL("0.5"),
|
||||
MaxWindowPoStGasFee: types.MustParseFIL("5"),
|
||||
MaxPublishDealsFee: types.MustParseFIL("0.05"),
|
||||
MaxMarketBalanceAddFee: types.MustParseFIL("0.007"),
|
||||
|
@ -328,6 +328,18 @@ func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber
|
||||
return sm.Miner.RemoveSector(ctx, id)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error {
|
||||
return sm.Miner.TerminateSector(ctx, id)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
return sm.Miner.TerminateFlush(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return sm.Miner.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
|
||||
return sm.Miner.MarkForUpgrade(id)
|
||||
}
|
||||
|
@ -164,6 +164,20 @@ func TestWindowedPost(t *testing.T) {
|
||||
test.TestWindowPost(t, builder.MockSbBuilder, 2*time.Millisecond, 10)
|
||||
}
|
||||
|
||||
func TestTerminate(t *testing.T) {
|
||||
if os.Getenv("LOTUS_TEST_WINDOW_POST") != "1" {
|
||||
t.Skip("this takes a few minutes, set LOTUS_TEST_WINDOW_POST=1 to run")
|
||||
}
|
||||
|
||||
logging.SetLogLevel("miner", "ERROR")
|
||||
logging.SetLogLevel("chainstore", "ERROR")
|
||||
logging.SetLogLevel("chain", "ERROR")
|
||||
logging.SetLogLevel("sub", "ERROR")
|
||||
logging.SetLogLevel("storageminer", "ERROR")
|
||||
|
||||
test.TestTerminate(t, builder.MockSbBuilder, 2*time.Millisecond)
|
||||
}
|
||||
|
||||
func TestCCUpgrade(t *testing.T) {
|
||||
logging.SetLogLevel("miner", "ERROR")
|
||||
logging.SetLogLevel("chainstore", "ERROR")
|
||||
|
@ -4,8 +4,6 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
cbg "github.com/whyrusleeping/cbor-gen"
|
||||
"golang.org/x/xerrors"
|
||||
@ -14,6 +12,8 @@ import (
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
"github.com/filecoin-project/go-state-types/crypto"
|
||||
"github.com/filecoin-project/go-state-types/dline"
|
||||
"github.com/filecoin-project/go-state-types/network"
|
||||
|
||||
market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market"
|
||||
|
||||
@ -243,6 +243,15 @@ func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr addre
|
||||
return nil, nil // not found
|
||||
}
|
||||
|
||||
func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tok sealing.TipSetToken) ([]api.Partition, error) {
|
||||
tsk, err := types.TipSetKeyFromBytes(tok)
|
||||
if err != nil {
|
||||
return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err)
|
||||
}
|
||||
|
||||
return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk)
|
||||
}
|
||||
|
||||
func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, error) {
|
||||
tsk, err := types.TipSetKeyFromBytes(tok)
|
||||
if err != nil {
|
||||
@ -266,6 +275,15 @@ func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok sealing.
|
||||
return s.delegate.StateNetworkVersion(ctx, tsk)
|
||||
}
|
||||
|
||||
func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (*dline.Info, error) {
|
||||
tsk, err := types.TipSetKeyFromBytes(tok)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return s.delegate.StateMinerProvingDeadline(ctx, maddr, tsk)
|
||||
}
|
||||
|
||||
func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) {
|
||||
msg := types.Message{
|
||||
To: to,
|
||||
|
@ -30,6 +30,8 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi m
|
||||
addrs = append(addrs, as.PreCommitControl...)
|
||||
case api.CommitAddr:
|
||||
addrs = append(addrs, as.CommitControl...)
|
||||
case api.TerminateSectorsAddr:
|
||||
addrs = append(addrs, as.TerminateControl...)
|
||||
default:
|
||||
defaultCtl := map[address.Address]struct{}{}
|
||||
for _, a := range mi.ControlAddresses {
|
||||
|
@ -148,6 +148,7 @@ func (m *Miner) Run(ctx context.Context) error {
|
||||
fc := sealing.FeeConfig{
|
||||
MaxPreCommitGasFee: abi.TokenAmount(m.feeCfg.MaxPreCommitGasFee),
|
||||
MaxCommitGasFee: abi.TokenAmount(m.feeCfg.MaxCommitGasFee),
|
||||
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
|
||||
}
|
||||
|
||||
evts := events.NewEvents(ctx, m.api)
|
||||
|
@ -4,6 +4,8 @@ import (
|
||||
"context"
|
||||
"io"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
|
||||
@ -44,6 +46,18 @@ func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error {
|
||||
return m.sealing.Remove(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error {
|
||||
return m.sealing.Terminate(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) {
|
||||
return m.sealing.TerminateFlush(ctx)
|
||||
}
|
||||
|
||||
func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) {
|
||||
return m.sealing.TerminatePending(ctx)
|
||||
}
|
||||
|
||||
func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error {
|
||||
return m.sealing.MarkForUpgrade(id)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user