From 52cc2cd3ebecf607c7f19e75d425ce1e9be2559d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 13 Jan 2021 00:42:01 +0100 Subject: [PATCH 01/18] Initial sector termination support --- api/api_storage.go | 6 + api/apistruct/struct.go | 5 + cmd/lotus-storage-miner/sectors.go | 36 +++- extern/storage-sealing/cbor_gen.go | 95 +++++++++- extern/storage-sealing/fsm.go | 23 +++ extern/storage-sealing/fsm_events.go | 26 +++ extern/storage-sealing/sealing.go | 18 +- extern/storage-sealing/sector_state.go | 5 + extern/storage-sealing/states_failed.go | 8 + extern/storage-sealing/states_proving.go | 77 ++++++++ extern/storage-sealing/terminate_batch.go | 216 ++++++++++++++++++++++ extern/storage-sealing/types.go | 4 + node/config/def.go | 2 + node/impl/storminer.go | 4 + storage/miner.go | 1 + storage/sealing.go | 4 + 16 files changed, 527 insertions(+), 3 deletions(-) create mode 100644 extern/storage-sealing/terminate_batch.go diff --git a/api/api_storage.go b/api/api_storage.go index 85eb03115..51f5c48bd 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -65,7 +65,11 @@ type StorageMiner interface { // SectorGetExpectedSealDuration gets the expected time for a sector to seal SectorGetExpectedSealDuration(context.Context) (time.Duration, error) SectorsUpdate(context.Context, abi.SectorNumber, SectorState) error + // SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can + // be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. SectorRemove(context.Context, abi.SectorNumber) error + // SectorTerminate terminates the sector on-chain, then automatically removes it from storage + SectorTerminate(context.Context, abi.SectorNumber) error SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) @@ -217,6 +221,8 @@ const ( PreCommitAddr AddrUse = iota CommitAddr PoStAddr + + TerminateSectorsAddr ) type AddressConfig struct { diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 4bf1c0d01..7591d92ac 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -314,6 +314,7 @@ type StorageMinerStruct struct { SectorGetExpectedSealDuration func(context.Context) (time.Duration, error) `perm:"read"` SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"` SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` + SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm @@ -1310,6 +1311,10 @@ func (c *StorageMinerStruct) SectorRemove(ctx context.Context, number abi.Sector return c.Internal.SectorRemove(ctx, number) } +func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.SectorNumber) error { + return c.Internal.SectorTerminate(ctx, number) +} + func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error { return c.Internal.SectorMarkForUpgrade(ctx, number) } diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 1c3e4858c..fa21d1eea 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -35,6 +35,7 @@ var sectorsCmd = &cli.Command{ sectorsRefsCmd, sectorsUpdateCmd, sectorsPledgeCmd, + sectorsTerminateCmd, sectorsRemoveCmd, sectorsMarkForUpgradeCmd, sectorsStartSealCmd, @@ -396,9 +397,42 @@ var sectorsRefsCmd = &cli.Command{ }, } +var sectorsTerminateCmd = &cli.Command{ + Name: "terminate", + Usage: "Terminate sector on-chain then remove (WARNING: This means losing power and collateral for the removed sector)", + ArgsUsage: "", + Flags: []cli.Flag{ + &cli.BoolFlag{ + Name: "really-do-it", + Usage: "pass this flag if you know what you are doing", + }, + }, + Action: func(cctx *cli.Context) error { + if !cctx.Bool("really-do-it") { + return xerrors.Errorf("pass --really-do-it to confirm this action") + } + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + if cctx.Args().Len() != 1 { + return xerrors.Errorf("must pass sector number") + } + + id, err := strconv.ParseUint(cctx.Args().Get(0), 10, 64) + if err != nil { + return xerrors.Errorf("could not parse sector number: %w", err) + } + + return nodeApi.SectorTerminate(ctx, abi.SectorNumber(id)) + }, +} + var sectorsRemoveCmd = &cli.Command{ Name: "remove", - Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector)", + Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))", ArgsUsage: "", Flags: []cli.Flag{ &cli.BoolFlag{ diff --git a/extern/storage-sealing/cbor_gen.go b/extern/storage-sealing/cbor_gen.go index 78765d7b4..70be08ace 100644 --- a/extern/storage-sealing/cbor_gen.go +++ b/extern/storage-sealing/cbor_gen.go @@ -475,7 +475,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { _, err := w.Write(cbg.CborNull) return err } - if _, err := w.Write([]byte{183}); err != nil { + if _, err := w.Write([]byte{184, 25}); err != nil { return err } @@ -928,6 +928,50 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { return err } + // t.TerminateMessage (cid.Cid) (struct) + if len("TerminateMessage") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TerminateMessage\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminateMessage"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("TerminateMessage")); err != nil { + return err + } + + if t.TerminateMessage == nil { + if _, err := w.Write(cbg.CborNull); err != nil { + return err + } + } else { + if err := cbg.WriteCidBuf(scratch, w, *t.TerminateMessage); err != nil { + return xerrors.Errorf("failed to write cid field t.TerminateMessage: %w", err) + } + } + + // t.TerminatedAt (abi.ChainEpoch) (int64) + if len("TerminatedAt") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"TerminatedAt\" was too long") + } + + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajTextString, uint64(len("TerminatedAt"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("TerminatedAt")); err != nil { + return err + } + + if t.TerminatedAt >= 0 { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajUnsignedInt, uint64(t.TerminatedAt)); err != nil { + return err + } + } else { + if err := cbg.WriteMajorTypeHeaderBuf(scratch, w, cbg.MajNegativeInt, uint64(-t.TerminatedAt-1)); err != nil { + return err + } + } + // t.LastErr (string) (string) if len("LastErr") > cbg.MaxLength { return xerrors.Errorf("Value in field \"LastErr\" was too long") @@ -1441,6 +1485,55 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) error { t.Return = ReturnState(sval) } + // t.TerminateMessage (cid.Cid) (struct) + case "TerminateMessage": + + { + + b, err := br.ReadByte() + if err != nil { + return err + } + if b != cbg.CborNull[0] { + if err := br.UnreadByte(); err != nil { + return err + } + + c, err := cbg.ReadCid(br) + if err != nil { + return xerrors.Errorf("failed to read cid field t.TerminateMessage: %w", err) + } + + t.TerminateMessage = &c + } + + } + // t.TerminatedAt (abi.ChainEpoch) (int64) + case "TerminatedAt": + { + maj, extra, err := cbg.CborReadHeaderBuf(br, scratch) + var extraI int64 + if err != nil { + return err + } + switch maj { + case cbg.MajUnsignedInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 positive overflow") + } + case cbg.MajNegativeInt: + extraI = int64(extra) + if extraI < 0 { + return fmt.Errorf("int64 negative oveflow") + } + extraI = -1 - extraI + default: + return fmt.Errorf("wrong type for int64 field: %d", maj) + } + + t.TerminatedAt = abi.ChainEpoch(extraI) + } // t.LastErr (string) (string) case "LastErr": diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index ea4982d2c..c989d0296 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -148,6 +148,21 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorFaultReported{}, FaultReported), on(SectorFaulty{}, Faulty), ), + Terminating: planOne( + on(SectorTerminating{}, TerminateWait), + on(SectorTerminateFailed{}, TerminateFailed), + ), + TerminateWait: planOne( + on(SectorTerminated{}, TerminateFinality), + on(SectorTerminateFailed{}, TerminateFailed), + ), + TerminateFinality: planOne( + on(SectorTerminateFailed{}, TerminateFailed), + // SectorRemove (global) + ), + TerminateFailed: planOne( + // SectorTerminating (global) + ), Removing: planOne( on(SectorRemoved{}, Removed), on(SectorRemoveFailed{}, RemoveFailed), @@ -328,6 +343,14 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta // Post-seal case Proving: return m.handleProvingSector, processed, nil + case Terminating: + return m.handleTerminating, processed, nil + case TerminateWait: + return m.handleTerminateWait, processed, nil + case TerminateFinality: + return m.handleTerminateFinality, processed, nil + case TerminateFailed: + return m.handleTerminateFailed, processed, nil case Removing: return m.handleRemoving, processed, nil case Removed: diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 59f5e77e6..167217080 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -314,6 +314,32 @@ func (evt SectorFaultReported) apply(state *SectorInfo) { type SectorFaultedFinal struct{} +// Terminating + +type SectorTerminate struct{} + +func (evt SectorTerminate) applyGlobal(state *SectorInfo) bool { + state.State = Terminating + return true +} + +type SectorTerminating struct{ Message cid.Cid } + +func (evt SectorTerminating) apply(state *SectorInfo) { + state.TerminateMessage = &evt.Message +} + +type SectorTerminated struct{ TerminatedAt abi.ChainEpoch } + +func (evt SectorTerminated) apply(state *SectorInfo) { + state.TerminatedAt = evt.TerminatedAt +} + +type SectorTerminateFailed struct{ error } + +func (evt SectorTerminateFailed) FormatError(xerrors.Printer) (next error) { return evt.error } +func (evt SectorTerminateFailed) apply(*SectorInfo) {} + // External events type SectorRemove struct{} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 92beb8ddf..662bd8fba 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -94,12 +94,15 @@ type Sealing struct { stats SectorStats + terminator *TerminateBatcher + getConfig GetSealingConfigFunc } type FeeConfig struct { MaxPreCommitGasFee abi.TokenAmount MaxCommitGasFee abi.TokenAmount + MaxTerminateGasFee abi.TokenAmount } type UnsealedSectorMap struct { @@ -136,6 +139,8 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds notifee: notifee, addrSel: as, + terminator: NewTerminationBatcher(context.TODO(), maddr, api, as, fc), + getConfig: gc, stats: SectorStats{ @@ -160,7 +165,14 @@ func (m *Sealing) Run(ctx context.Context) error { } func (m *Sealing) Stop(ctx context.Context) error { - return m.sectors.Stop(ctx) + if err := m.terminator.Stop(ctx); err != nil { + return err + } + + if err := m.sectors.Stop(ctx); err != nil { + return err + } + return nil } func (m *Sealing) AddPieceToAnySector(ctx context.Context, size abi.UnpaddedPieceSize, r io.Reader, d DealInfo) (abi.SectorNumber, abi.PaddedPieceSize, error) { @@ -265,6 +277,10 @@ func (m *Sealing) Remove(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorRemove{}) } +func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { + return m.sectors.Send(uint64(sid), SectorTerminate{}) +} + // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index ed32a110b..08c9c1b11 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -69,6 +69,11 @@ const ( FaultReported SectorState = "FaultReported" // sector has been declared as a fault on chain FaultedFinal SectorState = "FaultedFinal" // fault declared on chain + Terminating SectorState = "Terminating" + TerminateWait SectorState = "TerminateWait" + TerminateFinality SectorState = "TerminateFinality" + TerminateFailed SectorState = "TerminateFailed" + Removing SectorState = "Removing" RemoveFailed SectorState = "RemoveFailed" Removed SectorState = "Removed" diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index e425606de..1dbf99cda 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -309,6 +309,14 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } +func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorTerminate{}) +} + func (m *Sealing) handleDealsExpired(ctx statemachine.Context, sector SectorInfo) error { // First make vary sure the sector isn't committed si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index de7e6c8d0..5107bd6c3 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -1,9 +1,15 @@ package sealing import ( + "time" + "golang.org/x/xerrors" + "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine" + + "github.com/filecoin-project/lotus/build" + "github.com/filecoin-project/lotus/chain/actors/policy" ) func (m *Sealing) handleFaulty(ctx statemachine.Context, sector SectorInfo) error { @@ -31,6 +37,77 @@ func (m *Sealing) handleFaultReported(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorFaultedFinal{}) } +func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) error { + // First step of sector termination + // * See if sector is live + // * If not, goto removing + // * Add to termination queue + // * Wait for message to land on-chain + // * Check for correct termination + // * wait for expiration (+winning lookback?) + + si, err := m.api.StateSectorGetInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting sector info: %w", err)}) + } + + if si == nil { + // either already terminated or not committed yet + // todo / edge case - may be in process of being committed, but let's call that really unlikely + return ctx.Send(SectorRemoved{}) + } + + termCid, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber)) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)}) + } + + return ctx.Send(SectorTerminating{Message: termCid}) +} + +func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error { + if sector.TerminateMessage == nil { + return xerrors.New("entered TerminateWait with nil TerminateMessage") + } + + mw, err := m.api.StateWaitMsg(ctx.Context(), *sector.TerminateMessage) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("waiting for terminate message to land on chain: %w", err)}) + } + + if mw.Receipt.ExitCode != exitcode.Ok { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("terminate message failed to execute: exit %d: %w", mw.Receipt.ExitCode, err)}) + } + + return ctx.Send(SectorTerminated{TerminatedAt: mw.Height}) +} + +func (m *Sealing) handleTerminateFinality(ctx statemachine.Context, sector SectorInfo) error { + for { + tok, epoch, err := m.api.ChainHead(ctx.Context()) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting chain head: %w", err)}) + } + + nv, err := m.api.StateNetworkVersion(ctx.Context(), tok) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("getting network version: %w", err)}) + } + + if epoch >= sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv) { + return ctx.Send(SectorRemove{}) + } + + toWait := time.Duration(epoch-sector.TerminatedAt+policy.GetWinningPoStSectorSetLookback(nv)) * time.Duration(build.BlockDelaySecs) * time.Second + select { + case <-time.After(toWait): + continue + case <-ctx.Context().Done(): + return ctx.Context().Err() + } + } +} + func (m *Sealing) handleRemoving(ctx statemachine.Context, sector SectorInfo) error { if err := m.sealer.Remove(ctx.Context(), m.minerSector(sector.SectorType, sector.SectorNumber)); err != nil { return ctx.Send(SectorRemoveFailed{err}) diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go new file mode 100644 index 000000000..fdc61fa21 --- /dev/null +++ b/extern/storage-sealing/terminate_batch.go @@ -0,0 +1,216 @@ +package sealing + +import ( + "bytes" + "context" + "sync" + "time" + + "github.com/ipfs/go-cid" + "golang.org/x/xerrors" + + "github.com/filecoin-project/go-state-types/big" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" + + "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" + "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/chain/actors/builtin/miner" +) + +var ( + // TODO: config + + TerminateBatchMax uint64 = 100 // adjust based on real-world gas numbers, actors limit at 10k + TerminateBatchMin uint64 = 1 + TerminateBatchWait = 5 * time.Minute +) + +type TerminateBatcherApi interface { + StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) + SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) + StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) +} + +type TerminateBatcher struct { + api TerminateBatcherApi + maddr address.Address + mctx context.Context + addrSel AddrSel + feeCfg FeeConfig + + todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField + + waiting map[SectorLocation][]chan cid.Cid + + notify, force, stop, stopped chan struct{} + lk sync.Mutex +} + +func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher { + b := &TerminateBatcher{ + api: api, + maddr: maddr, + mctx: mctx, + addrSel: addrSel, + feeCfg: feeCfg, + + todo: map[SectorLocation]*bitfield.BitField{}, + waiting: map[SectorLocation][]chan cid.Cid{}, + + notify: make(chan struct{}, 1), + force: make(chan struct{}), + stop: make(chan struct{}), + stopped: make(chan struct{}), + } + + go b.run() + + return b +} + +func (b *TerminateBatcher) run() { + for { + var notif, after bool + select { + case <-b.stop: + close(b.stopped) + return + case <-b.notify: + notif = true // send above max + case <-time.After(TerminateBatchWait): + after = true // send above min + case <-b.force: // user triggered + } + + b.lk.Lock() + params := miner2.TerminateSectorsParams{} + + for loc, sectors := range b.todo { + n, err := sectors.Count() + if err != nil { + log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + } + + if notif && n < TerminateBatchMax { + continue + } + if after && n < TerminateBatchMin { + continue + } + if n < 1 { + log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) + continue + } + + params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ + Deadline: loc.Deadline, + Partition: loc.Partition, + Sectors: *sectors, + }) + } + + if len(params.Terminations) == 0 { + b.lk.Unlock() + continue // nothing to do + } + + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err) + b.lk.Unlock() + continue + } + + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + log.Warnw("TerminateBatcher: couldn't get miner info", "error", err) + b.lk.Unlock() + continue + } + + from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) + if err != nil { + log.Warnw("TerminateBatcher: no good address found", "error", err) + b.lk.Unlock() + continue + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) + if err != nil { + log.Errorw("TerminateBatcher: sending message failed", "error", err) + b.lk.Unlock() + continue + } + log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) + + for _, t := range params.Terminations { + delete(b.todo, SectorLocation{ + Deadline: t.Deadline, + Partition: t.Partition, + }) + } + + for _, w := range b.waiting { + for _, ch := range w { + ch <- mcid // buffered + } + } + b.waiting = map[SectorLocation][]chan cid.Cid{} + + b.lk.Unlock() + } +} + +// register termination, wait for batch message, return message CID +func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) { + maddr, err := address.NewIDAddress(uint64(s.Miner)) + if err != nil { + return cid.Undef, err + } + + loc, err := b.api.StateSectorPartition(ctx, maddr, s.Number, nil) + if err != nil { + return cid.Undef, xerrors.Errorf("getting sector location: %w", err) + } + if loc == nil { + return cid.Undef, xerrors.New("sector location not found") + } + + b.lk.Lock() + bf, ok := b.todo[*loc] + if !ok { + n := bitfield.New() + bf = &n + b.todo[*loc] = bf + } + bf.Set(uint64(s.Number)) + + sent := make(chan cid.Cid, 1) + b.waiting[*loc] = append(b.waiting[*loc], sent) + + select { + case b.notify <- struct{}{}: + default: // already have a pending notification, don't need more + } + b.lk.Unlock() + + select { + case c := <-sent: + return c, nil + case <-ctx.Done(): + return cid.Undef, ctx.Err() + } +} + +func (b *TerminateBatcher) Stop(ctx context.Context) error { + close(b.stop) + + select { + case <-b.stopped: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} diff --git a/extern/storage-sealing/types.go b/extern/storage-sealing/types.go index 8f3e82a0b..1d5073622 100644 --- a/extern/storage-sealing/types.go +++ b/extern/storage-sealing/types.go @@ -103,6 +103,10 @@ type SectorInfo struct { // Recovery Return ReturnState + // Termination + TerminateMessage *cid.Cid + TerminatedAt abi.ChainEpoch + // Debug LastErr string diff --git a/node/config/def.go b/node/config/def.go index 68371c384..a20e0ceaa 100644 --- a/node/config/def.go +++ b/node/config/def.go @@ -69,6 +69,7 @@ type SealingConfig struct { type MinerFeeConfig struct { MaxPreCommitGasFee types.FIL MaxCommitGasFee types.FIL + MaxTerminateGasFee types.FIL MaxWindowPoStGasFee types.FIL MaxPublishDealsFee types.FIL MaxMarketBalanceAddFee types.FIL @@ -211,6 +212,7 @@ func DefaultStorageMiner() *StorageMiner { Fees: MinerFeeConfig{ MaxPreCommitGasFee: types.MustParseFIL("0.025"), MaxCommitGasFee: types.MustParseFIL("0.05"), + MaxTerminateGasFee: types.MustParseFIL("0.5"), MaxWindowPoStGasFee: types.MustParseFIL("5"), MaxPublishDealsFee: types.MustParseFIL("0.05"), MaxMarketBalanceAddFee: types.MustParseFIL("0.007"), diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 7c1328361..62b2614f2 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -328,6 +328,10 @@ func (sm *StorageMinerAPI) SectorRemove(ctx context.Context, id abi.SectorNumber return sm.Miner.RemoveSector(ctx, id) } +func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNumber) error { + return sm.Miner.TerminateSector(ctx, id) +} + func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { return sm.Miner.MarkForUpgrade(id) } diff --git a/storage/miner.go b/storage/miner.go index a0d5a6a92..752d7ff42 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -148,6 +148,7 @@ func (m *Miner) Run(ctx context.Context) error { fc := sealing.FeeConfig{ MaxPreCommitGasFee: abi.TokenAmount(m.feeCfg.MaxPreCommitGasFee), MaxCommitGasFee: abi.TokenAmount(m.feeCfg.MaxCommitGasFee), + MaxTerminateGasFee: abi.TokenAmount(m.feeCfg.MaxTerminateGasFee), } evts := events.NewEvents(ctx, m.api) diff --git a/storage/sealing.go b/storage/sealing.go index 2cd454e5b..d3960509d 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -44,6 +44,10 @@ func (m *Miner) RemoveSector(ctx context.Context, id abi.SectorNumber) error { return m.sealing.Remove(ctx, id) } +func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error { + return m.sealing.Terminate(ctx, id) +} + func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { return m.sealing.MarkForUpgrade(id) } From 174c595accc60b1128442a7ce0a90fe84031c1b9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 13 Jan 2021 22:19:10 +0100 Subject: [PATCH 02/18] Don't declare in proving window --- extern/storage-sealing/sealing.go | 2 ++ extern/storage-sealing/terminate_batch.go | 34 ++++++++++++++++++----- storage/adapter_storage_miner.go | 13 +++++++-- 3 files changed, 40 insertions(+), 9 deletions(-) diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 662bd8fba..6bf9df0db 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -19,6 +19,7 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" "github.com/filecoin-project/go-state-types/network" statemachine "github.com/filecoin-project/go-statemachine" "github.com/filecoin-project/specs-storage/storage" @@ -60,6 +61,7 @@ type SealingAPI interface { StateMinerSectorAllocated(context.Context, address.Address, abi.SectorNumber, TipSetToken) (bool, error) StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) + StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index fdc61fa21..69593aeda 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -9,12 +9,13 @@ import ( "github.com/ipfs/go-cid" "golang.org/x/xerrors" - "github.com/filecoin-project/go-state-types/big" - miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" - "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" + "github.com/filecoin-project/go-state-types/big" + "github.com/filecoin-project/go-state-types/dline" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" + "github.com/filecoin-project/lotus/api" "github.com/filecoin-project/lotus/chain/actors/builtin/miner" ) @@ -31,6 +32,7 @@ type TerminateBatcherApi interface { StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) + StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) } type TerminateBatcher struct { @@ -84,26 +86,34 @@ func (b *TerminateBatcher) run() { case <-b.force: // user triggered } + dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + if err != nil { + log.Errorw("TerminateBatcher: getting proving deadline info failed", "error", err) + continue + } + b.lk.Lock() params := miner2.TerminateSectorsParams{} + var total uint64 for loc, sectors := range b.todo { n, err := sectors.Count() if err != nil { log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) } - if notif && n < TerminateBatchMax { - continue - } - if after && n < TerminateBatchMin { + // don't send terminations for currently challenged sectors + if loc.Deadline == dl.Index || (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { continue } + if n < 1 { log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) continue } + total += n + params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ Deadline: loc.Deadline, Partition: loc.Partition, @@ -116,6 +126,16 @@ func (b *TerminateBatcher) run() { continue // nothing to do } + if notif && total < TerminateBatchMax { + b.lk.Unlock() + continue + } + + if after && total < TerminateBatchMin { + b.lk.Unlock() + continue + } + enc := new(bytes.Buffer) if err := params.MarshalCBOR(enc); err != nil { log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err) diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 071ad30df..0aa1ad2c7 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -4,8 +4,6 @@ import ( "bytes" "context" - "github.com/filecoin-project/go-state-types/network" - "github.com/ipfs/go-cid" cbg "github.com/whyrusleeping/cbor-gen" "golang.org/x/xerrors" @@ -14,6 +12,8 @@ import ( "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/big" "github.com/filecoin-project/go-state-types/crypto" + "github.com/filecoin-project/go-state-types/dline" + "github.com/filecoin-project/go-state-types/network" market2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/market" @@ -266,6 +266,15 @@ func (s SealingAPIAdapter) StateNetworkVersion(ctx context.Context, tok sealing. return s.delegate.StateNetworkVersion(ctx, tsk) } +func (s SealingAPIAdapter) StateMinerProvingDeadline(ctx context.Context, maddr address.Address, tok sealing.TipSetToken) (*dline.Info, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, err + } + + return s.delegate.StateMinerProvingDeadline(ctx, maddr, tsk) +} + func (s SealingAPIAdapter) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) { msg := types.Message{ To: to, From db977a2f9187ce48cb48517d88d1ada7a6be10e9 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 13 Jan 2021 22:19:42 +0100 Subject: [PATCH 03/18] docsgen --- documentation/en/api-methods-miner.md | 20 +++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/documentation/en/api-methods-miner.md b/documentation/en/api-methods-miner.md index 0a6f8ec27..060e4066a 100644 --- a/documentation/en/api-methods-miner.md +++ b/documentation/en/api-methods-miner.md @@ -99,6 +99,7 @@ * [SectorSetExpectedSealDuration](#SectorSetExpectedSealDuration) * [SectorSetSealDelay](#SectorSetSealDelay) * [SectorStartSealing](#SectorStartSealing) + * [SectorTerminate](#SectorTerminate) * [Sectors](#Sectors) * [SectorsList](#SectorsList) * [SectorsListInStates](#SectorsListInStates) @@ -1475,7 +1476,9 @@ Inputs: Response: `{}` ### SectorRemove -There are not yet any comments for this method. +SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can +be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. + Perms: admin @@ -1535,6 +1538,21 @@ Inputs: Response: `{}` +### SectorTerminate +SectorTerminate terminates the sector on-chain, then automatically removes it from storage + + +Perms: admin + +Inputs: +```json +[ + 9 +] +``` + +Response: `{}` + ## Sectors From 3522c8d45a321fd989b2f652d7c0c44a6fb1b914 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Wed, 13 Jan 2021 23:32:04 +0100 Subject: [PATCH 04/18] SectorTerminateFlush API --- api/api_storage.go | 6 +++- api/apistruct/struct.go | 5 ++++ documentation/en/api-methods-miner.md | 15 +++++++++- extern/storage-sealing/sealing.go | 4 +++ extern/storage-sealing/terminate_batch.go | 36 ++++++++++++++++++++--- node/impl/storminer.go | 4 +++ storage/sealing.go | 6 ++++ 7 files changed, 70 insertions(+), 6 deletions(-) diff --git a/api/api_storage.go b/api/api_storage.go index 51f5c48bd..9e193351d 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -68,8 +68,12 @@ type StorageMiner interface { // SectorRemove removes the sector from storage. It doesn't terminate it on-chain, which can // be done with SectorTerminate. Removing and not terminating live sectors will cause additional penalties. SectorRemove(context.Context, abi.SectorNumber) error - // SectorTerminate terminates the sector on-chain, then automatically removes it from storage + // SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then + // automatically removes it from storage SectorTerminate(context.Context, abi.SectorNumber) error + // SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. + // Returns null if message wasn't sent + SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 7591d92ac..64b1614b6 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -315,6 +315,7 @@ type StorageMinerStruct struct { SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"` SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` + SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm @@ -1315,6 +1316,10 @@ func (c *StorageMinerStruct) SectorTerminate(ctx context.Context, number abi.Sec return c.Internal.SectorTerminate(ctx, number) } +func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) { + return c.Internal.SectorTerminateFlush(ctx) +} + func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error { return c.Internal.SectorMarkForUpgrade(ctx, number) } diff --git a/documentation/en/api-methods-miner.md b/documentation/en/api-methods-miner.md index 060e4066a..05cd1361d 100644 --- a/documentation/en/api-methods-miner.md +++ b/documentation/en/api-methods-miner.md @@ -100,6 +100,7 @@ * [SectorSetSealDelay](#SectorSetSealDelay) * [SectorStartSealing](#SectorStartSealing) * [SectorTerminate](#SectorTerminate) + * [SectorTerminateFlush](#SectorTerminateFlush) * [Sectors](#Sectors) * [SectorsList](#SectorsList) * [SectorsListInStates](#SectorsListInStates) @@ -1539,7 +1540,8 @@ Inputs: Response: `{}` ### SectorTerminate -SectorTerminate terminates the sector on-chain, then automatically removes it from storage +SectorTerminate terminates the sector on-chain (adding it to a termination batch first), then +automatically removes it from storage Perms: admin @@ -1553,6 +1555,17 @@ Inputs: Response: `{}` +### SectorTerminateFlush +SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. +Returns null if message wasn't sent + + +Perms: admin + +Inputs: `null` + +Response: `null` + ## Sectors diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 6bf9df0db..acede4726 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -283,6 +283,10 @@ func (m *Sealing) Terminate(ctx context.Context, sid abi.SectorNumber) error { return m.sectors.Send(uint64(sid), SectorTerminate{}) } +func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) { + return m.terminator.Flush(ctx) +} + // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index 69593aeda..ca68af8ad 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -46,8 +46,9 @@ type TerminateBatcher struct { waiting map[SectorLocation][]chan cid.Cid - notify, force, stop, stopped chan struct{} - lk sync.Mutex + notify, stop, stopped chan struct{} + force chan chan *cid.Cid + lk sync.Mutex } func NewTerminationBatcher(mctx context.Context, maddr address.Address, api TerminateBatcherApi, addrSel AddrSel, feeCfg FeeConfig) *TerminateBatcher { @@ -62,7 +63,7 @@ func NewTerminationBatcher(mctx context.Context, maddr address.Address, api Term waiting: map[SectorLocation][]chan cid.Cid{}, notify: make(chan struct{}, 1), - force: make(chan struct{}), + force: make(chan chan *cid.Cid), stop: make(chan struct{}), stopped: make(chan struct{}), } @@ -73,7 +74,16 @@ func NewTerminationBatcher(mctx context.Context, maddr address.Address, api Term } func (b *TerminateBatcher) run() { + var forceRes chan *cid.Cid + var lastMsg *cid.Cid + for { + if forceRes != nil { + forceRes <- lastMsg + forceRes = nil + } + lastMsg = nil + var notif, after bool select { case <-b.stop: @@ -83,7 +93,8 @@ func (b *TerminateBatcher) run() { notif = true // send above max case <-time.After(TerminateBatchWait): after = true // send above min - case <-b.force: // user triggered + case fr := <-b.force: // user triggered + forceRes = fr } dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) @@ -163,6 +174,7 @@ func (b *TerminateBatcher) run() { b.lk.Unlock() continue } + lastMsg = &mcid log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) for _, t := range params.Terminations { @@ -177,6 +189,7 @@ func (b *TerminateBatcher) run() { ch <- mcid // buffered } } + b.waiting = map[SectorLocation][]chan cid.Cid{} b.lk.Unlock() @@ -224,6 +237,21 @@ func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) ( } } +func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) { + resCh := make(chan *cid.Cid, 1) + select { + case b.force <- resCh: + select { + case res := <-resCh: + return res, nil + case <-ctx.Done(): + return nil, ctx.Err() + } + case <-ctx.Done(): + return nil, ctx.Err() + } +} + func (b *TerminateBatcher) Stop(ctx context.Context) error { close(b.stop) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 62b2614f2..970990129 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -332,6 +332,10 @@ func (sm *StorageMinerAPI) SectorTerminate(ctx context.Context, id abi.SectorNum return sm.Miner.TerminateSector(ctx, id) } +func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) { + return sm.Miner.TerminateFlush(ctx) +} + func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { return sm.Miner.MarkForUpgrade(id) } diff --git a/storage/sealing.go b/storage/sealing.go index d3960509d..f987ac882 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -4,6 +4,8 @@ import ( "context" "io" + "github.com/ipfs/go-cid" + "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" @@ -48,6 +50,10 @@ func (m *Miner) TerminateSector(ctx context.Context, id abi.SectorNumber) error return m.sealing.Terminate(ctx, id) } +func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) { + return m.sealing.TerminateFlush(ctx) +} + func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { return m.sealing.MarkForUpgrade(id) } From 1564db1fcee2be3a6e291cde9d4321ac85eb0725 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 00:11:41 +0100 Subject: [PATCH 05/18] Sector termination test --- api/apistruct/struct.go | 2 +- api/test/window_post.go | 163 +++++++++++++++++++++++ extern/storage-sealing/states_proving.go | 2 +- node/node_test.go | 14 ++ 4 files changed, 179 insertions(+), 2 deletions(-) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 64b1614b6..30ee19262 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -315,7 +315,7 @@ type StorageMinerStruct struct { SectorsUpdate func(context.Context, abi.SectorNumber, api.SectorState) error `perm:"admin"` SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` - SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` + SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm diff --git a/api/test/window_post.go b/api/test/window_post.go index ff107ae8d..e8b12e231 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -14,6 +14,7 @@ import ( "github.com/stretchr/testify/require" "github.com/filecoin-project/go-address" + "github.com/filecoin-project/go-bitfield" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-state-types/network" "github.com/filecoin-project/lotus/extern/sector-storage/mock" @@ -211,6 +212,7 @@ func TestWindowPost(t *testing.T, b APIBuilder, blocktime time.Duration, nSector } } + func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, nSectors int, upgradeHeight abi.ChainEpoch) { ctx, cancel := context.WithCancel(context.Background()) @@ -428,3 +430,164 @@ func testWindowPostUpgrade(t *testing.T, b APIBuilder, blocktime time.Duration, sectors = p.MinerPower.RawBytePower.Uint64() / uint64(ssz) require.Equal(t, nSectors+GenesisPreseals-2+1, int(sectors)) // -2 not recovered sectors + 1 just pledged } + +func TestTerminate(t *testing.T, b APIBuilder, blocktime time.Duration) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + nSectors := uint64(2) + + n, sn := b(t, []FullNodeOpts{FullNodeWithActorsV2At(1)}, []StorageMiner{{Full: 0, Preseal: int(nSectors)}}) + + client := n[0].FullNode.(*impl.FullNodeAPI) + miner := sn[0] + + addrinfo, err := client.NetAddrsListen(ctx) + if err != nil { + t.Fatal(err) + } + + if err := miner.NetConnect(ctx, addrinfo); err != nil { + t.Fatal(err) + } + build.Clock.Sleep(time.Second) + + done := make(chan struct{}) + go func() { + defer close(done) + for ctx.Err() == nil { + build.Clock.Sleep(blocktime) + if err := sn[0].MineOne(ctx, MineNext); err != nil { + if ctx.Err() != nil { + // context was canceled, ignore the error. + return + } + t.Error(err) + } + } + }() + defer func() { + cancel() + <-done + }() + + maddr, err := miner.ActorAddress(ctx) + require.NoError(t, err) + + ssz, err := miner.ActorSectorSize(ctx, maddr) + require.NoError(t, err) + + p, err := client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors)) + + fmt.Printf("Seal a sector\n") + + pledgeSectors(t, ctx, miner, 1, 0, nil) + + fmt.Printf("wait for power\n") + + { + // Wait until proven. + di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + + waitUntil := di.PeriodStart + di.WPoStProvingPeriod + 2 + fmt.Printf("End for head.Height > %d\n", waitUntil) + + for { + head, err := client.ChainHead(ctx) + require.NoError(t, err) + + if head.Height() > waitUntil { + fmt.Printf("Now head.Height = %d\n", head.Height()) + break + } + } + } + + nSectors++ + + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*nSectors)) + + fmt.Println("Terminate a sector") + + toTerminate := abi.SectorNumber(3) + + err = miner.SectorTerminate(ctx, 3) + require.NoError(t, err) + + msgTriggerred := false +loop: + for { + si, err := miner.SectorsStatus(ctx, toTerminate, false) + require.NoError(t, err) + + fmt.Println("state: ", si.State, msgTriggerred) + + switch sealing.SectorState(si.State) { + case sealing.Terminating: + if !msgTriggerred { + c, err := miner.SectorTerminateFlush(ctx) + if err != nil { + return + } + if c != nil { + msgTriggerred = true + fmt.Println("terminate message:", c) + } + } + case sealing.TerminateWait, sealing.TerminateFinality, sealing.Removed: + break loop + } + + time.Sleep(100 * time.Millisecond) + } + + // check power decreased + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1))) + + // check in terminated set + { + parts, err := client.StateMinerPartitions(ctx, maddr, 1, types.EmptyTSK) + require.NoError(t, err) + require.Greater(t, len(parts), 0) + + bflen := func(b bitfield.BitField) uint64 { + l, err := b.Count() + require.NoError(t, err) + return l + } + + require.Equal(t, uint64(1), bflen(parts[0].AllSectors)) + require.Equal(t, uint64(0), bflen(parts[0].LiveSectors)) + } + + di, err := client.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + for { + head, err := client.ChainHead(ctx) + require.NoError(t, err) + + if head.Height() > di.PeriodStart+di.WPoStProvingPeriod+2 { + fmt.Printf("Now head.Height = %d\n", head.Height()) + break + } + build.Clock.Sleep(blocktime) + } + require.NoError(t, err) + fmt.Printf("End for head.Height > %d\n", di.PeriodStart+di.WPoStProvingPeriod+2) + + p, err = client.StateMinerPower(ctx, maddr, types.EmptyTSK) + require.NoError(t, err) + + require.Equal(t, p.MinerPower, p.TotalPower) + require.Equal(t, p.MinerPower.RawBytePower, types.NewInt(uint64(ssz)*(nSectors-1))) +} diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index 5107bd6c3..90ecc3cbf 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -54,7 +54,7 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) if si == nil { // either already terminated or not committed yet // todo / edge case - may be in process of being committed, but let's call that really unlikely - return ctx.Send(SectorRemoved{}) + return ctx.Send(SectorRemove{}) } termCid, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber)) diff --git a/node/node_test.go b/node/node_test.go index 0baa047da..ecc0914ae 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -164,6 +164,20 @@ func TestWindowedPost(t *testing.T) { test.TestWindowPost(t, builder.MockSbBuilder, 2*time.Millisecond, 10) } +func TestTerminate(t *testing.T) { + if os.Getenv("LOTUS_TEST_WINDOW_POST") != "1" { + t.Skip("this takes a few minutes, set LOTUS_TEST_WINDOW_POST=1 to run") + } + + logging.SetLogLevel("miner", "ERROR") + logging.SetLogLevel("chainstore", "ERROR") + logging.SetLogLevel("chain", "ERROR") + logging.SetLogLevel("sub", "ERROR") + logging.SetLogLevel("storageminer", "ERROR") + + test.TestTerminate(t, builder.MockSbBuilder, 2*time.Millisecond) +} + func TestCCUpgrade(t *testing.T) { logging.SetLogLevel("miner", "ERROR") logging.SetLogLevel("chainstore", "ERROR") From 1cfb73cc3bab9fb70ad675fa51eca2dacc930a54 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 00:14:43 +0100 Subject: [PATCH 06/18] ci: Run sector termination tests --- .circleci/config.yml | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/.circleci/config.yml b/.circleci/config.yml index f38c5ba29..04feeedf3 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -200,6 +200,8 @@ jobs: <<: *test test-window-post: <<: *test + test-terminate: + <<: *test test-conformance: description: | Run tests using a corpus of interoperable test vectors for Filecoin @@ -476,9 +478,15 @@ workflows: test-suite-name: cli packages: "./cli/... ./cmd/... ./api/..." - test-window-post: + codecov-upload: true go-test-flags: "-run=TestWindowedPost" winpost-test: "1" test-suite-name: window-post + - test-terminate: + codecov-upload: true + go-test-flags: "-run=TestTerminate" + winpost-test: "1" + test-suite-name: terminate - test-short: go-test-flags: "--timeout 10m --short" test-suite-name: short From 144b5a1350226916771ce84b84cea5686df073b2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:37:23 +0100 Subject: [PATCH 07/18] perning termination API --- api/api_storage.go | 3 ++ api/apistruct/struct.go | 5 ++++ extern/storage-sealing/sealing.go | 4 +++ extern/storage-sealing/terminate_batch.go | 35 +++++++++++++++++++++++ node/impl/storminer.go | 4 +++ storage/addresses.go | 2 ++ storage/sealing.go | 4 +++ 7 files changed, 57 insertions(+) diff --git a/api/api_storage.go b/api/api_storage.go index 9e193351d..042dad73b 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -74,6 +74,8 @@ type StorageMiner interface { // SectorTerminateFlush immediately sends a terminate message with sectors batched for termination. // Returns null if message wasn't sent SectorTerminateFlush(ctx context.Context) (*cid.Cid, error) + // SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message + SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error StorageList(ctx context.Context) (map[stores.ID][]stores.Decl, error) @@ -232,4 +234,5 @@ const ( type AddressConfig struct { PreCommitControl []address.Address CommitControl []address.Address + TerminateControl []address.Address } diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 30ee19262..1569f1b2a 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -316,6 +316,7 @@ type StorageMinerStruct struct { SectorRemove func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminate func(context.Context, abi.SectorNumber) error `perm:"admin"` SectorTerminateFlush func(ctx context.Context) (*cid.Cid, error) `perm:"admin"` + SectorTerminatePending func(ctx context.Context) ([]abi.SectorID, error) `perm:"admin"` SectorMarkForUpgrade func(ctx context.Context, id abi.SectorNumber) error `perm:"admin"` WorkerConnect func(context.Context, string) error `perm:"admin" retry:"true"` // TODO: worker perm @@ -1320,6 +1321,10 @@ func (c *StorageMinerStruct) SectorTerminateFlush(ctx context.Context) (*cid.Cid return c.Internal.SectorTerminateFlush(ctx) } +func (c *StorageMinerStruct) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return c.Internal.SectorTerminatePending(ctx) +} + func (c *StorageMinerStruct) SectorMarkForUpgrade(ctx context.Context, number abi.SectorNumber) error { return c.Internal.SectorMarkForUpgrade(ctx, number) } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index acede4726..891bf8a1a 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -287,6 +287,10 @@ func (m *Sealing) TerminateFlush(ctx context.Context) (*cid.Cid, error) { return m.terminator.Flush(ctx) } +func (m *Sealing) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.terminator.Pending(ctx) +} + // Caller should NOT hold m.unsealedInfoMap.lk func (m *Sealing) StartPacking(sectorID abi.SectorNumber) error { // locking here ensures that when the SectorStartPacking event is sent, the sector won't be picked up anywhere else diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index ca68af8ad..53eec2d82 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -3,6 +3,7 @@ package sealing import ( "bytes" "context" + "sort" "sync" "time" @@ -252,6 +253,40 @@ func (b *TerminateBatcher) Flush(ctx context.Context) (*cid.Cid, error) { } } +func (b *TerminateBatcher) Pending(ctx context.Context) ([]abi.SectorID, error) { + b.lk.Lock() + defer b.lk.Unlock() + + mid, err := address.IDFromAddress(b.maddr) + if err != nil { + return nil, err + } + + res := make([]abi.SectorID, 0) + for _, bf := range b.todo { + err := bf.ForEach(func(id uint64) error { + res = append(res, abi.SectorID{ + Miner: abi.ActorID(mid), + Number: abi.SectorNumber(id), + }) + return nil + }) + if err != nil { + return nil, err + } + } + + sort.Slice(res, func(i, j int) bool { + if res[i].Miner != res[j].Miner { + return res[i].Miner < res[j].Miner + } + + return res[i].Number < res[j].Number + }) + + return res, nil +} + func (b *TerminateBatcher) Stop(ctx context.Context) error { close(b.stop) diff --git a/node/impl/storminer.go b/node/impl/storminer.go index 970990129..fe79817a5 100644 --- a/node/impl/storminer.go +++ b/node/impl/storminer.go @@ -336,6 +336,10 @@ func (sm *StorageMinerAPI) SectorTerminateFlush(ctx context.Context) (*cid.Cid, return sm.Miner.TerminateFlush(ctx) } +func (sm *StorageMinerAPI) SectorTerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return sm.Miner.TerminatePending(ctx) +} + func (sm *StorageMinerAPI) SectorMarkForUpgrade(ctx context.Context, id abi.SectorNumber) error { return sm.Miner.MarkForUpgrade(id) } diff --git a/storage/addresses.go b/storage/addresses.go index 5da8643cd..f406394b4 100644 --- a/storage/addresses.go +++ b/storage/addresses.go @@ -30,6 +30,8 @@ func (as *AddressSelector) AddressFor(ctx context.Context, a addrSelectApi, mi m addrs = append(addrs, as.PreCommitControl...) case api.CommitAddr: addrs = append(addrs, as.CommitControl...) + case api.TerminateSectorsAddr: + addrs = append(addrs, as.TerminateControl...) default: defaultCtl := map[address.Address]struct{}{} for _, a := range mi.ControlAddresses { diff --git a/storage/sealing.go b/storage/sealing.go index f987ac882..d07a14810 100644 --- a/storage/sealing.go +++ b/storage/sealing.go @@ -54,6 +54,10 @@ func (m *Miner) TerminateFlush(ctx context.Context) (*cid.Cid, error) { return m.sealing.TerminateFlush(ctx) } +func (m *Miner) TerminatePending(ctx context.Context) ([]abi.SectorID, error) { + return m.sealing.TerminatePending(ctx) +} + func (m *Miner) MarkForUpgrade(id abi.SectorNumber) error { return m.sealing.MarkForUpgrade(id) } From 785b66072478a9694bd3c7d40ae1009e4f0dbe36 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:37:31 +0100 Subject: [PATCH 08/18] perning termination CLI --- cmd/lotus-storage-miner/sectors.go | 58 ++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index fa21d1eea..fe8905204 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -407,6 +407,10 @@ var sectorsTerminateCmd = &cli.Command{ Usage: "pass this flag if you know what you are doing", }, }, + Subcommands: []*cli.Command{ + sectorsTerminateFlushCmd, + sectorsTerminatePendingCmd, + }, Action: func(cctx *cli.Context) error { if !cctx.Bool("really-do-it") { return xerrors.Errorf("pass --really-do-it to confirm this action") @@ -430,6 +434,60 @@ var sectorsTerminateCmd = &cli.Command{ }, } +var sectorsTerminateFlushCmd = &cli.Command{ + Name: "flush", + Usage: "Send a terminate message if there are sectors queued for termination", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + if cctx.Args().Len() != 1 { + return xerrors.Errorf("must pass sector number") + } + + mcid, err := nodeApi.SectorTerminateFlush(ctx) + if err != nil { + return err + } + + if mcid == nil { + return xerrors.New("no sectors were queued for termination") + } + + return nil + }, +} + +var sectorsTerminatePendingCmd = &cli.Command{ + Name: "pending", + Usage: "List sector numbers of sectors pending termination", + Action: func(cctx *cli.Context) error { + nodeApi, closer, err := lcli.GetStorageMinerAPI(cctx) + if err != nil { + return err + } + defer closer() + ctx := lcli.ReqContext(cctx) + if cctx.Args().Len() != 1 { + return xerrors.Errorf("must pass sector number") + } + + pending, err := nodeApi.SectorTerminatePending(ctx) + if err != nil { + return err + } + + for _, id := range pending { + fmt.Println(id) + } + + return nil + }, +} + var sectorsRemoveCmd = &cli.Command{ Name: "remove", Usage: "Forcefully remove a sector (WARNING: This means losing power and collateral for the removed sector (use 'terminate' for lower penalty))", From f01f1d377b024f692264d59f0d3e6b70985c402a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:41:27 +0100 Subject: [PATCH 09/18] Test pending terminations --- api/test/window_post.go | 19 +++++++++++++++---- build/version.go | 2 +- 2 files changed, 16 insertions(+), 5 deletions(-) diff --git a/api/test/window_post.go b/api/test/window_post.go index e8b12e231..84d668c76 100644 --- a/api/test/window_post.go +++ b/api/test/window_post.go @@ -518,7 +518,7 @@ func TestTerminate(t *testing.T, b APIBuilder, blocktime time.Duration) { toTerminate := abi.SectorNumber(3) - err = miner.SectorTerminate(ctx, 3) + err = miner.SectorTerminate(ctx, toTerminate) require.NoError(t, err) msgTriggerred := false @@ -532,13 +532,24 @@ loop: switch sealing.SectorState(si.State) { case sealing.Terminating: if !msgTriggerred { - c, err := miner.SectorTerminateFlush(ctx) - if err != nil { - return + { + p, err := miner.SectorTerminatePending(ctx) + require.NoError(t, err) + require.Len(t, p, 1) + require.Equal(t, abi.SectorNumber(3), p[0].Number) } + + c, err := miner.SectorTerminateFlush(ctx) + require.NoError(t, err) if c != nil { msgTriggerred = true fmt.Println("terminate message:", c) + + { + p, err := miner.SectorTerminatePending(ctx) + require.NoError(t, err) + require.Len(t, p, 0) + } } } case sealing.TerminateWait, sealing.TerminateFinality, sealing.Removed: diff --git a/build/version.go b/build/version.go index 140456581..fd13100a5 100644 --- a/build/version.go +++ b/build/version.go @@ -84,7 +84,7 @@ func VersionForType(nodeType NodeType) (Version, error) { // semver versions of the rpc api exposed var ( FullAPIVersion = newVer(1, 0, 0) - MinerAPIVersion = newVer(1, 0, 0) + MinerAPIVersion = newVer(1, 0, 1) WorkerAPIVersion = newVer(1, 0, 0) ) From 7ddf1d1feb1c649fcecaf0787b514b416e6c3ddc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:44:27 +0100 Subject: [PATCH 10/18] Add terminating states to state lists --- cmd/lotus-storage-miner/info.go | 4 ++++ extern/storage-sealing/sector_state.go | 6 +++++- 2 files changed, 9 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/info.go b/cmd/lotus-storage-miner/info.go index ed74da96b..8c6297339 100644 --- a/cmd/lotus-storage-miner/info.go +++ b/cmd/lotus-storage-miner/info.go @@ -298,6 +298,10 @@ var stateList = []stateMeta{ {col: color.FgYellow, state: sealing.CommitWait}, {col: color.FgYellow, state: sealing.FinalizeSector}, + {col: color.FgCyan, state: sealing.Terminating}, + {col: color.FgCyan, state: sealing.TerminateWait}, + {col: color.FgCyan, state: sealing.TerminateFinality}, + {col: color.FgCyan, state: sealing.TerminateFailed}, {col: color.FgCyan, state: sealing.Removing}, {col: color.FgCyan, state: sealing.Removed}, diff --git a/extern/storage-sealing/sector_state.go b/extern/storage-sealing/sector_state.go index 08c9c1b11..49a607958 100644 --- a/extern/storage-sealing/sector_state.go +++ b/extern/storage-sealing/sector_state.go @@ -30,6 +30,10 @@ var ExistSectorStateList = map[SectorState]struct{}{ Faulty: {}, FaultReported: {}, FaultedFinal: {}, + Terminating: {}, + TerminateWait: {}, + TerminateFinality: {}, + TerminateFailed: {}, Removing: {}, RemoveFailed: {}, Removed: {}, @@ -83,7 +87,7 @@ func toStatState(st SectorState) statSectorState { switch st { case Empty, WaitDeals, Packing, GetTicket, PreCommit1, PreCommit2, PreCommitting, PreCommitWait, WaitSeed, Committing, SubmitCommit, CommitWait, FinalizeSector: return sstSealing - case Proving, Removed, Removing: + case Proving, Removed, Removing, Terminating, TerminateWait, TerminateFinality, TerminateFailed: return sstProving } From 18d38ca42fe059fe85bd1e36c09744e15ec9525c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 12:52:08 +0100 Subject: [PATCH 11/18] docsgen --- documentation/en/api-methods-miner.md | 14 +++++++++++++- 1 file changed, 13 insertions(+), 1 deletion(-) diff --git a/documentation/en/api-methods-miner.md b/documentation/en/api-methods-miner.md index 05cd1361d..66512a02c 100644 --- a/documentation/en/api-methods-miner.md +++ b/documentation/en/api-methods-miner.md @@ -101,6 +101,7 @@ * [SectorStartSealing](#SectorStartSealing) * [SectorTerminate](#SectorTerminate) * [SectorTerminateFlush](#SectorTerminateFlush) + * [SectorTerminatePending](#SectorTerminatePending) * [Sectors](#Sectors) * [SectorsList](#SectorsList) * [SectorsListInStates](#SectorsListInStates) @@ -195,7 +196,8 @@ Response: ```json { "PreCommitControl": null, - "CommitControl": null + "CommitControl": null, + "TerminateControl": null } ``` @@ -1560,6 +1562,16 @@ SectorTerminateFlush immediately sends a terminate message with sectors batched Returns null if message wasn't sent +Perms: admin + +Inputs: `null` + +Response: `null` + +### SectorTerminatePending +SectorTerminatePending returns a list of pending sector terminations to be sent in the next batch message + + Perms: admin Inputs: `null` From 49abdd7d7d22957693b80062036053593eb78524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 15:46:57 +0100 Subject: [PATCH 12/18] Sector termination support - address review --- chain/actors/builtin/miner/miner.go | 4 + extern/storage-sealing/states_failed.go | 12 +- extern/storage-sealing/states_proving.go | 10 +- extern/storage-sealing/terminate_batch.go | 213 ++++++++++++---------- 4 files changed, 142 insertions(+), 97 deletions(-) diff --git a/chain/actors/builtin/miner/miner.go b/chain/actors/builtin/miner/miner.go index 5821d092b..1caf64c97 100644 --- a/chain/actors/builtin/miner/miner.go +++ b/chain/actors/builtin/miner/miner.go @@ -20,6 +20,7 @@ import ( builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" ) func init() { @@ -42,6 +43,9 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff const MinSectorExpiration = miner0.MinSectorExpiration +// Not used / checked in v0 +var DeclarationsMax = miner2.DeclarationsMax + func Load(store adt.Store, act *types.Actor) (st State, err error) { switch act.Code { case builtin0.StorageMinerActorCodeID: diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 1dbf99cda..07fa2ed70 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -309,8 +309,16 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } -func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { - if err := failedCooldown(ctx, sector); err != nil { +func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, si SectorInfo) error { + // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to + // the Terminating state after cooldown. If the API is still failing, well get back to here + // with the error in SectorInfo log. + pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + if pci != nil { + return nil // pause the fsm, needs manual user action + } + + if err := failedCooldown(ctx, si); err != nil { return err } diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index 90ecc3cbf..b24746d47 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -53,7 +53,15 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) if si == nil { // either already terminated or not committed yet - // todo / edge case - may be in process of being committed, but let's call that really unlikely + + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)}) + } + if pci != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")}) + } + return ctx.Send(SectorRemove{}) } diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index 53eec2d82..2eae0b08f 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -98,105 +98,130 @@ func (b *TerminateBatcher) run() { forceRes = fr } - dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + var err error + lastMsg, err = b.processBatch(notif, after) if err != nil { - log.Errorw("TerminateBatcher: getting proving deadline info failed", "error", err) - continue + log.Warnw("TerminateBatcher processBatch error", "error", err) } - - b.lk.Lock() - params := miner2.TerminateSectorsParams{} - - var total uint64 - for loc, sectors := range b.todo { - n, err := sectors.Count() - if err != nil { - log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) - } - - // don't send terminations for currently challenged sectors - if loc.Deadline == dl.Index || (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { - continue - } - - if n < 1 { - log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) - continue - } - - total += n - - params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ - Deadline: loc.Deadline, - Partition: loc.Partition, - Sectors: *sectors, - }) - } - - if len(params.Terminations) == 0 { - b.lk.Unlock() - continue // nothing to do - } - - if notif && total < TerminateBatchMax { - b.lk.Unlock() - continue - } - - if after && total < TerminateBatchMin { - b.lk.Unlock() - continue - } - - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err) - b.lk.Unlock() - continue - } - - mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) - if err != nil { - log.Warnw("TerminateBatcher: couldn't get miner info", "error", err) - b.lk.Unlock() - continue - } - - from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) - if err != nil { - log.Warnw("TerminateBatcher: no good address found", "error", err) - b.lk.Unlock() - continue - } - - mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) - if err != nil { - log.Errorw("TerminateBatcher: sending message failed", "error", err) - b.lk.Unlock() - continue - } - lastMsg = &mcid - log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) - - for _, t := range params.Terminations { - delete(b.todo, SectorLocation{ - Deadline: t.Deadline, - Partition: t.Partition, - }) - } - - for _, w := range b.waiting { - for _, ch := range w { - ch <- mcid // buffered - } - } - - b.waiting = map[SectorLocation][]chan cid.Cid{} - - b.lk.Unlock() } } +func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { + dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("getting proving deadline info failed: %w", err) + } + + b.lk.Lock() + defer b.lk.Unlock() + params := miner2.TerminateSectorsParams{} + + var total uint64 + for loc, sectors := range b.todo { + n, err := sectors.Count() + if err != nil { + log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + // don't send terminations for currently challenged sectors + if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) + loc.Deadline == dl.Index || // not in current + (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous + continue + } + + if n < 1 { + log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) + continue + } + + toTerminate, err := sectors.Copy() + if err != nil { + log.Warnw("TerminateBatcher: copy sectors bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + if total+n > uint64(miner.DeclarationsMax) { + n = uint64(miner.DeclarationsMax) - total + + toTerminate, err = toTerminate.Slice(0, n) + if err != nil { + log.Warnw("TerminateBatcher: slice toTerminate bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + *sectors, err = bitfield.SubtractBitField(*sectors, toTerminate) + if err != nil { + log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + } + + total += n + + params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ + Deadline: loc.Deadline, + Partition: loc.Partition, + Sectors: toTerminate, + }) + + if total >= uint64(miner.DeclarationsMax) { + break + } + } + + if len(params.Terminations) == 0 { + return nil, nil // nothing to do + } + + if notif && total < TerminateBatchMax { + return nil, nil + } + + if after && total < TerminateBatchMin { + return nil, nil + } + + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err) + } + + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("couldn't get miner info: %w", err) + } + + from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) + if err != nil { + return nil, xerrors.Errorf("no good address found: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) + if err != nil { + return nil, xerrors.Errorf("sending message failed: %w", err) + } + log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) + + for _, t := range params.Terminations { + delete(b.todo, SectorLocation{ + Deadline: t.Deadline, + Partition: t.Partition, + }) + } + + for _, w := range b.waiting { + for _, ch := range w { + ch <- mcid // buffered + } + } + + b.waiting = map[SectorLocation][]chan cid.Cid{} + + return &mcid, nil +} + // register termination, wait for batch message, return message CID func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) { maddr, err := address.NewIDAddress(uint64(s.Miner)) From 4015ddbb4fff8c251ea732dc4539c4f20c7a61f7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 16:13:32 +0100 Subject: [PATCH 13/18] fsm: Fix panic in precommit check in handleTerminating --- extern/storage-sealing/states_failed.go | 6 +++--- extern/storage-sealing/states_proving.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 07fa2ed70..4be654721 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -309,16 +309,16 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } -func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, si SectorInfo) error { +func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to // the Terminating state after cooldown. If the API is still failing, well get back to here // with the error in SectorInfo log. - pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if pci != nil { return nil // pause the fsm, needs manual user action } - if err := failedCooldown(ctx, si); err != nil { + if err := failedCooldown(ctx, sector); err != nil { return err } diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index b24746d47..d59db7e75 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -54,7 +54,7 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) if si == nil { // either already terminated or not committed yet - pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, sector.SectorNumber, nil) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)}) } From 4a114150f0af84afc5be9739c1ae4c35d2d2e3dd Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 16:14:24 +0100 Subject: [PATCH 14/18] fix miner terminate cli arg checks --- cmd/lotus-storage-miner/sectors.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index fe8905204..110572b78 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -444,9 +444,6 @@ var sectorsTerminateFlushCmd = &cli.Command{ } defer closer() ctx := lcli.ReqContext(cctx) - if cctx.Args().Len() != 1 { - return xerrors.Errorf("must pass sector number") - } mcid, err := nodeApi.SectorTerminateFlush(ctx) if err != nil { @@ -471,9 +468,6 @@ var sectorsTerminatePendingCmd = &cli.Command{ } defer closer() ctx := lcli.ReqContext(cctx) - if cctx.Args().Len() != 1 { - return xerrors.Errorf("must pass sector number") - } pending, err := nodeApi.SectorTerminatePending(ctx) if err != nil { From 9632a3836aa0764585331aa7eef8fc0944cbb187 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 16:47:30 +0100 Subject: [PATCH 15/18] Print msg cid in terminate flush --- cmd/lotus-storage-miner/sectors.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 110572b78..8e3b0bfc4 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -454,6 +454,8 @@ var sectorsTerminateFlushCmd = &cli.Command{ return xerrors.New("no sectors were queued for termination") } + fmt.Println(mcid) + return nil }, } @@ -475,7 +477,7 @@ var sectorsTerminatePendingCmd = &cli.Command{ } for _, id := range pending { - fmt.Println(id) + fmt.Println(id.Number) } return nil From 32885e1129178582fbf75711b249f5b8d99663c3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 17:14:26 +0100 Subject: [PATCH 16/18] termination batcher: Notify based on what was sent --- extern/storage-sealing/fsm_events.go | 4 +- extern/storage-sealing/sealing.go | 1 + extern/storage-sealing/states_proving.go | 9 ++-- extern/storage-sealing/terminate_batch.go | 52 ++++++++++++++++------- storage/adapter_storage_miner.go | 9 ++++ 5 files changed, 55 insertions(+), 20 deletions(-) diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index 167217080..e28366721 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -323,10 +323,10 @@ func (evt SectorTerminate) applyGlobal(state *SectorInfo) bool { return true } -type SectorTerminating struct{ Message cid.Cid } +type SectorTerminating struct{ Message *cid.Cid } func (evt SectorTerminating) apply(state *SectorInfo) { - state.TerminateMessage = &evt.Message + state.TerminateMessage = evt.Message } type SectorTerminated struct{ TerminatedAt abi.ChainEpoch } diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 891bf8a1a..96d63efdc 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -62,6 +62,7 @@ type SealingAPI interface { StateMarketStorageDeal(context.Context, abi.DealID, TipSetToken) (market.DealProposal, error) StateNetworkVersion(ctx context.Context, tok TipSetToken) (network.Version, error) StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) + StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error) SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) ChainHead(ctx context.Context) (TipSetToken, abi.ChainEpoch, error) ChainGetRandomnessFromBeacon(ctx context.Context, tok TipSetToken, personalization crypto.DomainSeparationTag, randEpoch abi.ChainEpoch, entropy []byte) (abi.Randomness, error) diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index d59db7e75..212fd906f 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -7,7 +7,6 @@ import ( "github.com/filecoin-project/go-state-types/exitcode" "github.com/filecoin-project/go-statemachine" - "github.com/filecoin-project/lotus/build" "github.com/filecoin-project/lotus/chain/actors/policy" ) @@ -65,12 +64,16 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) return ctx.Send(SectorRemove{}) } - termCid, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber)) + termCid, terminated, err := m.terminator.AddTermination(ctx.Context(), m.minerSectorID(sector.SectorNumber)) if err != nil { return ctx.Send(SectorTerminateFailed{xerrors.Errorf("queueing termination: %w", err)}) } - return ctx.Send(SectorTerminating{Message: termCid}) + if terminated { + return ctx.Send(SectorTerminating{Message: nil}) + } + + return ctx.Send(SectorTerminating{Message: &termCid}) } func (m *Sealing) handleTerminateWait(ctx statemachine.Context, sector SectorInfo) error { diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index 2eae0b08f..b86782808 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -34,6 +34,7 @@ type TerminateBatcherApi interface { SendMsg(ctx context.Context, from, to address.Address, method abi.MethodNum, value, maxFee abi.TokenAmount, params []byte) (cid.Cid, error) StateMinerInfo(context.Context, address.Address, TipSetToken) (miner.MinerInfo, error) StateMinerProvingDeadline(context.Context, address.Address, TipSetToken) (*dline.Info, error) + StateMinerPartitions(ctx context.Context, m address.Address, dlIdx uint64, tok TipSetToken) ([]api.Partition, error) } type TerminateBatcher struct { @@ -45,7 +46,7 @@ type TerminateBatcher struct { todo map[SectorLocation]*bitfield.BitField // MinerSectorLocation -> BitField - waiting map[SectorLocation][]chan cid.Cid + waiting map[abi.SectorNumber][]chan cid.Cid notify, stop, stopped chan struct{} force chan chan *cid.Cid @@ -61,7 +62,7 @@ func NewTerminationBatcher(mctx context.Context, maddr address.Address, api Term feeCfg: feeCfg, todo: map[SectorLocation]*bitfield.BitField{}, - waiting: map[SectorLocation][]chan cid.Cid{}, + waiting: map[abi.SectorNumber][]chan cid.Cid{}, notify: make(chan struct{}, 1), force: make(chan chan *cid.Cid), @@ -209,32 +210,53 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { Deadline: t.Deadline, Partition: t.Partition, }) - } - for _, w := range b.waiting { - for _, ch := range w { - ch <- mcid // buffered + err := t.Sectors.ForEach(func(sn uint64) error { + for _, ch := range b.waiting[abi.SectorNumber(sn)] { + ch <- mcid // buffered + } + delete(b.waiting, abi.SectorNumber(sn)) + + return nil + }) + if err != nil { + return nil, xerrors.Errorf("sectors foreach: %w", err) } } - b.waiting = map[SectorLocation][]chan cid.Cid{} - return &mcid, nil } // register termination, wait for batch message, return message CID -func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) { +// can return cid.Undef,true if the sector is already terminated on-chain +func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (mcid cid.Cid, terminated bool, err error) { maddr, err := address.NewIDAddress(uint64(s.Miner)) if err != nil { - return cid.Undef, err + return cid.Undef, false, err } loc, err := b.api.StateSectorPartition(ctx, maddr, s.Number, nil) if err != nil { - return cid.Undef, xerrors.Errorf("getting sector location: %w", err) + return cid.Undef, false, xerrors.Errorf("getting sector location: %w", err) } if loc == nil { - return cid.Undef, xerrors.New("sector location not found") + return cid.Undef, false, xerrors.New("sector location not found") + } + + { + // check if maybe already terminated + parts, err := b.api.StateMinerPartitions(ctx, maddr, loc.Deadline, nil) + if err != nil { + return cid.Cid{}, false, xerrors.Errorf("getting partitions: %w", err) + } + live, err := parts[loc.Partition].LiveSectors.IsSet(uint64(s.Number)) + if err != nil { + return cid.Cid{}, false, xerrors.Errorf("checking if sector is in live set: %w", err) + } + if !live { + // already terminated + return cid.Undef, true, nil + } } b.lk.Lock() @@ -247,7 +269,7 @@ func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) ( bf.Set(uint64(s.Number)) sent := make(chan cid.Cid, 1) - b.waiting[*loc] = append(b.waiting[*loc], sent) + b.waiting[s.Number] = append(b.waiting[s.Number], sent) select { case b.notify <- struct{}{}: @@ -257,9 +279,9 @@ func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) ( select { case c := <-sent: - return c, nil + return c, false, nil case <-ctx.Done(): - return cid.Undef, ctx.Err() + return cid.Undef, false, ctx.Err() } } diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 0aa1ad2c7..20bf30825 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -243,6 +243,15 @@ func (s SealingAPIAdapter) StateSectorPartition(ctx context.Context, maddr addre return nil, nil // not found } +func (s SealingAPIAdapter) StateMinerPartitions(ctx context.Context, maddr address.Address, dlIdx uint64, tok sealing.TipSetToken) ([]api.Partition, error) { + tsk, err := types.TipSetKeyFromBytes(tok) + if err != nil { + return nil, xerrors.Errorf("failed to unmarshal TipSetToken to TipSetKey: %w", err) + } + + return s.delegate.StateMinerPartitions(ctx, maddr, dlIdx, tsk) +} + func (s SealingAPIAdapter) StateMarketStorageDeal(ctx context.Context, dealID abi.DealID, tok sealing.TipSetToken) (market.DealProposal, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { From 1e53ed6a82b00a0d8cb25f54980ee17bfabe0191 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 17:27:28 +0100 Subject: [PATCH 17/18] Print proving window info in pending termination list --- cmd/lotus-storage-miner/sectors.go | 29 ++++++++++++++++++++++++++++- 1 file changed, 28 insertions(+), 1 deletion(-) diff --git a/cmd/lotus-storage-miner/sectors.go b/cmd/lotus-storage-miner/sectors.go index 8e3b0bfc4..5ef067b2c 100644 --- a/cmd/lotus-storage-miner/sectors.go +++ b/cmd/lotus-storage-miner/sectors.go @@ -469,6 +469,11 @@ var sectorsTerminatePendingCmd = &cli.Command{ return err } defer closer() + api, nCloser, err := lcli.GetFullNodeAPI(cctx) + if err != nil { + return err + } + defer nCloser() ctx := lcli.ReqContext(cctx) pending, err := nodeApi.SectorTerminatePending(ctx) @@ -476,8 +481,30 @@ var sectorsTerminatePendingCmd = &cli.Command{ return err } + maddr, err := nodeApi.ActorAddress(ctx) + if err != nil { + return err + } + + dl, err := api.StateMinerProvingDeadline(ctx, maddr, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("getting proving deadline info failed: %w", err) + } + for _, id := range pending { - fmt.Println(id.Number) + loc, err := api.StateSectorPartition(ctx, maddr, id.Number, types.EmptyTSK) + if err != nil { + return xerrors.Errorf("finding sector partition: %w", err) + } + + fmt.Print(id.Number) + + if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) + loc.Deadline == dl.Index || // not in current + (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous + fmt.Print(" (in proving window)") + } + fmt.Println() } return nil From 80b8d4b9d7d0f01a531368fa0ff7c762122c5c2c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 20:27:15 +0100 Subject: [PATCH 18/18] Address review --- chain/actors/builtin/miner/miner.go | 1 + extern/storage-sealing/terminate_batch.go | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 8 deletions(-) diff --git a/chain/actors/builtin/miner/miner.go b/chain/actors/builtin/miner/miner.go index 1caf64c97..066dc9bfd 100644 --- a/chain/actors/builtin/miner/miner.go +++ b/chain/actors/builtin/miner/miner.go @@ -45,6 +45,7 @@ const MinSectorExpiration = miner0.MinSectorExpiration // Not used / checked in v0 var DeclarationsMax = miner2.DeclarationsMax +var AddressedSectorsMax = miner2.AddressedSectorsMax func Load(store adt.Store, act *types.Actor) (st State, err error) { switch act.Code { diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index b86782808..31ccef93c 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -86,21 +86,21 @@ func (b *TerminateBatcher) run() { } lastMsg = nil - var notif, after bool + var sendAboveMax, sendAboveMin bool select { case <-b.stop: close(b.stopped) return case <-b.notify: - notif = true // send above max + sendAboveMax = true case <-time.After(TerminateBatchWait): - after = true // send above min + sendAboveMin = true case fr := <-b.force: // user triggered forceRes = fr } var err error - lastMsg, err = b.processBatch(notif, after) + lastMsg, err = b.processBatch(sendAboveMax, sendAboveMin) if err != nil { log.Warnw("TerminateBatcher processBatch error", "error", err) } @@ -143,8 +143,8 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { continue } - if total+n > uint64(miner.DeclarationsMax) { - n = uint64(miner.DeclarationsMax) - total + if total+n > uint64(miner.AddressedSectorsMax) { + n = uint64(miner.AddressedSectorsMax) - total toTerminate, err = toTerminate.Slice(0, n) if err != nil { @@ -152,11 +152,12 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { continue } - *sectors, err = bitfield.SubtractBitField(*sectors, toTerminate) + s, err := bitfield.SubtractBitField(*sectors, toTerminate) if err != nil { log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) continue } + *sectors = s } total += n @@ -167,7 +168,11 @@ func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { Sectors: toTerminate, }) - if total >= uint64(miner.DeclarationsMax) { + if total >= uint64(miner.AddressedSectorsMax) { + break + } + + if len(params.Terminations) >= miner.DeclarationsMax { break } }