Initial sector termination support
This commit is contained in:
parent
56277756a6
commit
52cc2cd3eb
@ -65,7 +65,11 @@ type StorageMiner interface {
|
||||
// SectorGetExpectedSealDuration gets the expected time for a sector to seal
|
||||
SectorGetExpectedSealDuration(context.Context) (time.Duration, error)
|
||||
SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error
|
||||
// SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can
|
||||
// be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties.
|
||||
SectorRemove(context.Context, abi.SectorNumber) error
|
||||
// SectorTerminate terminates the sector on-chain, then automatically removes it from storage
|
||||
SectorTerminate(context.Context, abi.SectorNumber) error
|
||||
SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error
|
||||
|
||||
StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error)
|
||||
@ -217,6 +221,8 @@ const (
|
||||
PreCommitAddr AddrUse = iota
|
||||
CommitAddr
|
||||
PoStAddr
|
||||
|
||||
TerminateSectorsAddr
|
||||
)
|
||||
|
||||
type AddressConfig struct {
|
||||
|
@ -314,6 +314,7 @@ type StorageMinerStruct struct {
|
||||
SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"`
|
||||
SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"`
|
||||
SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"`
|
||||
SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"`
|
||||
SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"`
|
||||
|
||||
WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm
|
||||
@ -1310,6 +1311,10 @@ func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.Sector
|
||||
return c.Internal.SectorRemove(ctx, number)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.SectorNumber) error {
|
||||
return c.Internal.SectorTerminate(ctx, number)
|
||||
}
|
||||
|
||||
func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error {
|
||||
return c.Internal.SectorMarkForUpgrade(ctx, number)
|
||||
}
|
||||
|
@ -35,6 +35,7 @@ var sectorsCmd = &cli.Command{
|
||||
sectorsRefsCmd,
|
||||
sectorsUpdateCmd,
|
||||
sectorsPledgeCmd,
|
||||
sectorsTerminateCmd,
|
||||
sectorsRemoveCmd,
|
||||
sectorsMarkForUpgradeCmd,
|
||||
sectorsStartSealCmd,
|
||||
@ -396,9 +397,42 @@ var sectorsRefsCmd = &cli.Command{
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsTerminateCmd = &cli.Command{
|
||||
Name: "terminate",
|
||||
Usage: "Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)",
|
||||
ArgsUsage: "<sectorNum>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
Name: "really-do-it",
|
||||
Usage: "pass this flag if you know what you are doing",
|
||||
},
|
||||
},
|
||||
Action: func(cctx *cli.Context) error {
|
||||
if !cctx.Bool("really-do-it") {
|
||||
return xerrors.Errorf("pass --really-do-it to confirm this action")
|
||||
}
|
||||
nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer closer()
|
||||
ctx := lcli.ReqContext(cctx)
|
||||
if cctx.Args().Len() != 1 {
|
||||
return xerrors.Errorf("must pass sector number")
|
||||
}
|
||||
|
||||
id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("could not parse sector number: %w", err)
|
||||
}
|
||||
|
||||
return nodeApi.SectorTerminate(ctx, abi.SectorNumber(id))
|
||||
},
|
||||
}
|
||||
|
||||
var sectorsRemoveCmd = &cli.Command{
|
||||
Name: "remove",
|
||||
Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)",
|
||||
Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))",
|
||||
ArgsUsage: "<sectorNum>",
|
||||
Flags: []cli.Flag{
|
||||
&cli.BoolFlag{
|
||||
|
95
extern/storage-sealing/cbor_gen.go
vendored
95
extern/storage-sealing/cbor_gen.go
vendored
@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
_, err := w.Write(cbg.CborNull)
|
||||
return err
|
||||
}
|
||||
if _, err := w.Write([]byte{183}); err != nil {
|
||||
if _, err := w.Write([]byte{184, 25}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -928,6 +928,50 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.TerminateMessage (cid.Cid) (struct)
|
||||
if len("TerminateMessage") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"TerminateMessage\" was too long")
|
||||
}
|
||||
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminateMessage"))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string("TerminateMessage")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.TerminateMessage == nil {
|
||||
if _, err := w.Write(cbg.CborNull); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteCidBuf(scratch, w, *t.TerminateMessage); err != nil {
|
||||
return xerrors.Errorf("failed to write cid field t.TerminateMessage: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
// t.TerminatedAt (abi.ChainEpoch) (int64)
|
||||
if len("TerminatedAt") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"TerminatedAt\" was too long")
|
||||
}
|
||||
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminatedAt"))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string("TerminatedAt")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if t.TerminatedAt >= 0 {
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.TerminatedAt)); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.TerminatedAt-1)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
// t.LastErr (string) (string)
|
||||
if len("LastErr") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"LastErr\" was too long")
|
||||
@ -1441,6 +1485,55 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error {
|
||||
|
||||
t.Return = ReturnState(sval)
|
||||
}
|
||||
// t.TerminateMessage (cid.Cid) (struct)
|
||||
case "TerminateMessage":
|
||||
|
||||
{
|
||||
|
||||
b, err := br.ReadByte()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if b != cbg.CborNull[0] {
|
||||
if err := br.UnreadByte(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
c, err := cbg.ReadCid(br)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("failed to read cid field t.TerminateMessage: %w", err)
|
||||
}
|
||||
|
||||
t.TerminateMessage = &c
|
||||
}
|
||||
|
||||
}
|
||||
// t.TerminatedAt (abi.ChainEpoch) (int64)
|
||||
case "TerminatedAt":
|
||||
{
|
||||
maj, extra, err := cbg.CborReadHeaderBuf(br, scratch)
|
||||
var extraI int64
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
switch maj {
|
||||
case cbg.MajUnsignedInt:
|
||||
extraI = int64(extra)
|
||||
if extraI < 0 {
|
||||
return fmt.Errorf("int64 positive overflow")
|
||||
}
|
||||
case cbg.MajNegativeInt:
|
||||
extraI = int64(extra)
|
||||
if extraI < 0 {
|
||||
return fmt.Errorf("int64 negative oveflow")
|
||||
}
|
||||
extraI = -1 - extraI
|
||||
default:
|
||||
return fmt.Errorf("wrong type for int64 field: %d", maj)
|
||||
}
|
||||
|
||||
t.TerminatedAt = abi.ChainEpoch(extraI)
|
||||
}
|
||||
// t.LastErr (string) (string)
|
||||
case "LastErr":
|
||||
|
||||
|
23
extern/storage-sealing/fsm.go
vendored
23
extern/storage-sealing/fsm.go
vendored
@ -148,6 +148,21 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
||||
on(SectorFaultReported{}, FaultReported),
|
||||
on(SectorFaulty{}, Faulty),
|
||||
),
|
||||
Terminating: planOne(
|
||||
on(SectorTerminating{}, TerminateWait),
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
),
|
||||
TerminateWait: planOne(
|
||||
on(SectorTerminated{}, TerminateFinality),
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
),
|
||||
TerminateFinality: planOne(
|
||||
on(SectorTerminateFailed{}, TerminateFailed),
|
||||
// SectorRemove (global)
|
||||
),
|
||||
TerminateFailed: planOne(
|
||||
// SectorTerminating (global)
|
||||
),
|
||||
Removing: planOne(
|
||||
on(SectorRemoved{}, Removed),
|
||||
on(SectorRemoveFailed{}, RemoveFailed),
|
||||
@ -328,6 +343,14 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
// Post-seal
|
||||
case Proving:
|
||||
return m.handleProvingSector, processed, nil
|
||||
case Terminating:
|
||||
return m.handleTerminating, processed, nil
|
||||
case TerminateWait:
|
||||
return m.handleTerminateWait, processed, nil
|
||||
case TerminateFinality:
|
||||
return m.handleTerminateFinality, processed, nil
|
||||
case TerminateFailed:
|
||||
return m.handleTerminateFailed, processed, nil
|
||||
case Removing:
|
||||
return m.handleRemoving, processed, nil
|
||||
case Removed:
|
||||
|
26
extern/storage-sealing/fsm_events.go
vendored
26
extern/storage-sealing/fsm_events.go
vendored
@ -314,6 +314,32 @@ func (evt SectorFaultReported) apply(state *SectorInfo) {
|
||||
|
||||
type SectorFaultedFinal struct{}
|
||||
|
||||
// Terminating
|
||||
|
||||
type SectorTerminate struct{}
|
||||
|
||||
func (evt SectorTerminate) applyGlobal(state *SectorInfo) bool {
|
||||
state.State = Terminating
|
||||
return true
|
||||
}
|
||||
|
||||
type SectorTerminating struct{ Message cid.Cid }
|
||||
|
||||
func (evt SectorTerminating) apply(state *SectorInfo) {
|
||||
state.TerminateMessage = &evt.Message
|
||||
}
|
||||
|
||||
type SectorTerminated struct{ TerminatedAt abi.ChainEpoch }
|
||||
|
||||
func (evt SectorTerminated) apply(state *SectorInfo) {
|
||||
state.TerminatedAt = evt.TerminatedAt
|
||||
}
|
||||
|
||||
type SectorTerminateFailed struct{ error }
|
||||
|
||||
func (evt SectorTerminateFailed) FormatError(xerrors.Printer) (next error) { return evt.error }
|
||||
func (evt SectorTerminateFailed) apply(*SectorInfo) {}
|
||||
|
||||
// External events
|
||||
|
||||
type SectorRemove struct{}
|
||||
|
18
extern/storage-sealing/sealing.go
vendored
18
extern/storage-sealing/sealing.go
vendored
@ -94,12 +94,15 @@ type Sealing struct {
|
||||
|
||||
stats SectorStats
|
||||
|
||||
terminator *TerminateBatcher
|
||||
|
||||
getConfig GetSealingConfigFunc
|
||||
}
|
||||
|
||||
type FeeConfig struct {
|
||||
MaxPreCommitGasFee abi.TokenAmount
|
||||
MaxCommitGasFee abi.TokenAmount
|
||||
MaxTerminateGasFee abi.TokenAmount
|
||||
}
|
||||
|
||||
type UnsealedSectorMap struct {
|
||||
@ -136,6 +139,8 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
||||
notifee: notifee,
|
||||
addrSel: as,
|
||||
|
||||
terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc),
|
||||
|
||||
getConfig: gc,
|
||||
|
||||
stats: SectorStats{
|
||||
@ -160,7 +165,14 @@ func (m *Sealing) Run(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (m *Sealing) Stop(ctx context.Context) error {
|
||||
return m.sectors.Stop(ctx)
|
||||
if err := m.terminator.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := m.sectors.Stop(ctx); err != nil {
|
||||
return err
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) {
|
||||
@ -265,6 +277,10 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorRemove{})
|
||||
}
|
||||
|
||||
func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error {
|
||||
return m.sectors.Send(uint64(sid), SectorTerminate{})
|
||||
}
|
||||
|
||||
// Caller should NOT hold m.unsealedInfoMap.lk
|
||||
func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error {
|
||||
// locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else
|
||||
|
5
extern/storage-sealing/sector_state.go
vendored
5
extern/storage-sealing/sector_state.go
vendored
@ -69,6 +69,11 @@ const (
|
||||
FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain
|
||||
FaultedFinal SectorState = "FaultedFinal" // fault declared on chain
|
||||
|
||||
Terminating SectorState = "Terminating"
|
||||
TerminateWait SectorState = "TerminateWait"
|
||||
TerminateFinality SectorState = "TerminateFinality"
|
||||
TerminateFailed SectorState = "TerminateFailed"
|
||||
|
||||
Removing SectorState = "Removing"
|
||||
RemoveFailed SectorState = "RemoveFailed"
|
||||
Removed SectorState = "Removed"
|
||||
|
8
extern/storage-sealing/states_failed.go
vendored
8
extern/storage-sealing/states_failed.go
vendored
@ -309,6 +309,14 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := failedCooldown(ctx, sector); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminate{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// First make vary sure the sector isn't committed
|
||||
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
|
77
extern/storage-sealing/states_proving.go
vendored
77
extern/storage-sealing/states_proving.go
vendored
@ -1,9 +1,15 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"time"
|
||||
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/exitcode"
|
||||
"github.com/filecoin-project/go-statemachine"
|
||||
|
||||
"github.com/filecoin-project/lotus/build"
|
||||
"github.com/filecoin-project/lotus/chain/actors/policy"
|
||||
)
|
||||
|
||||
func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error {
|
||||
@ -31,6 +37,77 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf
|
||||
return ctx.Send(SectorFaultedFinal{})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// First step of sector termination
|
||||
// * See if sector is live
|
||||
// * If not, goto removing
|
||||
// * Add to termination queue
|
||||
// * Wait for message to land on-chain
|
||||
// * Check for correct termination
|
||||
// * wait for expiration (+winning lookback?)
|
||||
|
||||
si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)})
|
||||
}
|
||||
|
||||
if si == nil {
|
||||
// either already terminated or not committed yet
|
||||
// todo / edge case - may be in process of being committed, but let's call that really unlikely
|
||||
return ctx.Send(SectorRemoved{})
|
||||
}
|
||||
|
||||
termCid, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber))
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminating{Message: termCid})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if sector.TerminateMessage == nil {
|
||||
return xerrors.New("entered TerminateWait with nil TerminateMessage")
|
||||
}
|
||||
|
||||
mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)})
|
||||
}
|
||||
|
||||
if mw.Receipt.ExitCode != exitcode.Ok {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("terminate message failed to execute: exit %d: %w", mw.Receipt.ExitCode, err)})
|
||||
}
|
||||
|
||||
return ctx.Send(SectorTerminated{TerminatedAt: mw.Height})
|
||||
}
|
||||
|
||||
func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error {
|
||||
for {
|
||||
tok, epoch, err := m.api.ChainHead(ctx.Context())
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)})
|
||||
}
|
||||
|
||||
nv, err := m.api.StateNetworkVersion(ctx.Context(), tok)
|
||||
if err != nil {
|
||||
return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)})
|
||||
}
|
||||
|
||||
if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) {
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second
|
||||
select {
|
||||
case <-time.After(toWait):
|
||||
continue
|
||||
case <-ctx.Context().Done():
|
||||
return ctx.Context().Err()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil {
|
||||
return ctx.Send(SectorRemoveFailed{err})
|
||||
|
216
extern/storage-sealing/terminate_batch.go
vendored
Normal file
216
extern/storage-sealing/terminate_batch.go
vendored
Normal file
@ -0,0 +1,216 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-state-types/big"
|
||||
miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-bitfield"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/chain/actors/builtin/miner"
|
||||
)
|
||||
|
||||
var (
|
||||
// TODO: config
|
||||
|
||||
TerminateBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k
|
||||
TerminateBatchMin uint64 = 1
|
||||
TerminateBatchWait = 5 * time.Minute
|
||||
)
|
||||
|
||||
type TerminateBatcherApi interface {
|
||||
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
||||
SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error)
|
||||
StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error)
|
||||
}
|
||||
|
||||
type TerminateBatcher struct {
|
||||
api TerminateBatcherApi
|
||||
maddr address.Address
|
||||
mctx context.Context
|
||||
addrSel AddrSel
|
||||
feeCfg FeeConfig
|
||||
|
||||
todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField
|
||||
|
||||
waiting map[SectorLocation][]chan cid.Cid
|
||||
|
||||
notify, force, stop, stopped chan struct{}
|
||||
lk sync.Mutex
|
||||
}
|
||||
|
||||
func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher {
|
||||
b := &TerminateBatcher{
|
||||
api: api,
|
||||
maddr: maddr,
|
||||
mctx: mctx,
|
||||
addrSel: addrSel,
|
||||
feeCfg: feeCfg,
|
||||
|
||||
todo: map[SectorLocation]*bitfield.BitField{},
|
||||
waiting: map[SectorLocation][]chan cid.Cid{},
|
||||
|
||||
notify: make(chan struct{}, 1),
|
||||
force: make(chan struct{}),
|
||||
stop: make(chan struct{}),
|
||||
stopped: make(chan struct{}),
|
||||
}
|
||||
|
||||
go b.run()
|
||||
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) run() {
|
||||
for {
|
||||
var notif, after bool
|
||||
select {
|
||||
case <-b.stop:
|
||||
close(b.stopped)
|
||||
return
|
||||
case <-b.notify:
|
||||
notif = true // send above max
|
||||
case <-time.After(TerminateBatchWait):
|
||||
after = true // send above min
|
||||
case <-b.force: // user triggered
|
||||
}
|
||||
|
||||
b.lk.Lock()
|
||||
params := miner2.TerminateSectorsParams{}
|
||||
|
||||
for loc, sectors := range b.todo {
|
||||
n, err := sectors.Count()
|
||||
if err != nil {
|
||||
log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err)
|
||||
}
|
||||
|
||||
if notif && n < TerminateBatchMax {
|
||||
continue
|
||||
}
|
||||
if after && n < TerminateBatchMin {
|
||||
continue
|
||||
}
|
||||
if n < 1 {
|
||||
log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition)
|
||||
continue
|
||||
}
|
||||
|
||||
params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{
|
||||
Deadline: loc.Deadline,
|
||||
Partition: loc.Partition,
|
||||
Sectors: *sectors,
|
||||
})
|
||||
}
|
||||
|
||||
if len(params.Terminations) == 0 {
|
||||
b.lk.Unlock()
|
||||
continue // nothing to do
|
||||
}
|
||||
|
||||
enc := new(bytes.Buffer)
|
||||
if err := params.MarshalCBOR(enc); err != nil {
|
||||
log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err)
|
||||
b.lk.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil)
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher: couldn't get miner info", "error", err)
|
||||
b.lk.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee)
|
||||
if err != nil {
|
||||
log.Warnw("TerminateBatcher: no good address found", "error", err)
|
||||
b.lk.Unlock()
|
||||
continue
|
||||
}
|
||||
|
||||
mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes())
|
||||
if err != nil {
|
||||
log.Errorw("TerminateBatcher: sending message failed", "error", err)
|
||||
b.lk.Unlock()
|
||||
continue
|
||||
}
|
||||
log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations))
|
||||
|
||||
for _, t := range params.Terminations {
|
||||
delete(b.todo, SectorLocation{
|
||||
Deadline: t.Deadline,
|
||||
Partition: t.Partition,
|
||||
})
|
||||
}
|
||||
|
||||
for _, w := range b.waiting {
|
||||
for _, ch := range w {
|
||||
ch <- mcid // buffered
|
||||
}
|
||||
}
|
||||
b.waiting = map[SectorLocation][]chan cid.Cid{}
|
||||
|
||||
b.lk.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
// register termination, wait for batch message, return message CID
|
||||
func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) {
|
||||
maddr, err := address.NewIDAddress(uint64(s.Miner))
|
||||
if err != nil {
|
||||
return cid.Undef, err
|
||||
}
|
||||
|
||||
loc, err := b.api.StateSectorPartition(ctx, maddr, s.Number, nil)
|
||||
if err != nil {
|
||||
return cid.Undef, xerrors.Errorf("getting sector location: %w", err)
|
||||
}
|
||||
if loc == nil {
|
||||
return cid.Undef, xerrors.New("sector location not found")
|
||||
}
|
||||
|
||||
b.lk.Lock()
|
||||
bf, ok := b.todo[*loc]
|
||||
if !ok {
|
||||
n := bitfield.New()
|
||||
bf = &n
|
||||
b.todo[*loc] = bf
|
||||
}
|
||||
bf.Set(uint64(s.Number))
|
||||
|
||||
sent := make(chan cid.Cid, 1)
|
||||
b.waiting[*loc] = append(b.waiting[*loc], sent)
|
||||
|
||||
select {
|
||||
case b.notify <- struct{}{}:
|
||||
default: // already have a pending notification, don't need more
|
||||
}
|
||||
b.lk.Unlock()
|
||||
|
||||
select {
|
||||
case c := <-sent:
|
||||
return c, nil
|
||||
case <-ctx.Done():
|
||||
return cid.Undef, ctx.Err()
|
||||
}
|
||||
}
|
||||
|
||||
func (b *TerminateBatcher) Stop(ctx context.Context) error {
|
||||
close(b.stop)
|
||||
|
||||
select {
|
||||
case <-b.stopped:
|
||||
return nil
|
||||
case <-ctx.Done():
|
||||
return ctx.Err()
|
||||
}
|
||||
}
|
4
extern/storage-sealing/types.go
vendored
4
extern/storage-sealing/types.go
vendored
@ -103,6 +103,10 @@ type SectorInfo struct {
|
||||
// Recovery
|
||||
Return ReturnState
|
||||
|
||||
// Termination
|
||||
TerminateMessage *cid.Cid
|
||||
TerminatedAt abi.ChainEpoch
|
||||
|
||||
// Debug
|
||||
LastErr string
|
||||
|
||||
|
@ -69,6 +69,7 @@ type SealingConfig struct {
|
||||
type MinerFeeConfig struct {
|
||||
MaxPreCommitGasFee types.FIL
|
||||
MaxCommitGasFee types.FIL
|
||||
MaxTerminateGasFee types.FIL
|
||||
MaxWindowPoStGasFee types.FIL
|
||||
MaxPublishDealsFee types.FIL
|
||||
MaxMarketBalanceAddFee types.FIL
|
||||
@ -211,6 +212,7 @@ func DefaultStorageMiner() *StorageMiner {
|
||||
Fees: MinerFeeConfig{
|
||||
MaxPreCommitGasFee: types.MustParseFIL("0.025"),
|
||||
MaxCommitGasFee: types.MustParseFIL("0.05"),
|
||||
MaxTerminateGasFee: types.MustParseFIL("0.5"),
|
||||
MaxWindowPoStGasFee: types.MustParseFIL("5"),
|
||||
MaxPublishDealsFee: types.MustParseFIL("0.05"),
|
||||
MaxMarketBalanceAddFee: types.MustParseFIL("0.007"),
|
||||
|
@ -328,6 +328,10 @@ func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber
|
||||
return sm.Miner.RemoveSector(ctx, id)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error {
|
||||
return sm.Miner.TerminateSector(ctx, id)
|
||||
}
|
||||
|
||||
func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error {
|
||||
return sm.Miner.MarkForUpgrade(id)
|
||||
}
|
||||
|
@ -148,6 +148,7 @@ func (m *Miner) Run(ctx context.Context) error {
|
||||
fc := sealing.FeeConfig{
|
||||
MaxPreCommitGasFee: abi.TokenAmount(m.feeCfg.MaxPreCommitGasFee),
|
||||
MaxCommitGasFee: abi.TokenAmount(m.feeCfg.MaxCommitGasFee),
|
||||
MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee),
|
||||
}
|
||||
|
||||
evts := events.NewEvents(ctx, m.api)
|
||||
|
@ -44,6 +44,10 @@ func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error {
|
||||
return m.sealing.Remove(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error {
|
||||
return m.sealing.Terminate(ctx, id)
|
||||
}
|
||||
|
||||
func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error {
|
||||
return m.sealing.MarkForUpgrade(id)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user