From 49abdd7d7d22957693b80062036053593eb78524 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 14 Jan 2021 15:46:57 +0100 Subject: [PATCH] Sector termination support - address review --- chain/actors/builtin/miner/miner.go | 4 + extern/storage-sealing/states_failed.go | 12 +- extern/storage-sealing/states_proving.go | 10 +- extern/storage-sealing/terminate_batch.go | 213 ++++++++++++---------- 4 files changed, 142 insertions(+), 97 deletions(-) diff --git a/chain/actors/builtin/miner/miner.go b/chain/actors/builtin/miner/miner.go index 5821d092b..1caf64c97 100644 --- a/chain/actors/builtin/miner/miner.go +++ b/chain/actors/builtin/miner/miner.go @@ -20,6 +20,7 @@ import ( builtin0 "github.com/filecoin-project/specs-actors/actors/builtin" miner0 "github.com/filecoin-project/specs-actors/actors/builtin/miner" builtin2 "github.com/filecoin-project/specs-actors/v2/actors/builtin" + miner2 "github.com/filecoin-project/specs-actors/v2/actors/builtin/miner" ) func init() { @@ -42,6 +43,9 @@ var FaultDeclarationCutoff = miner0.FaultDeclarationCutoff const MinSectorExpiration = miner0.MinSectorExpiration +// Not used / checked in v0 +var DeclarationsMax = miner2.DeclarationsMax + func Load(store adt.Store, act *types.Actor) (st State, err error) { switch act.Code { case builtin0.StorageMinerActorCodeID: diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index 1dbf99cda..07fa2ed70 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -309,8 +309,16 @@ func (m *Sealing) handleRemoveFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRemove{}) } -func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, sector SectorInfo) error { - if err := failedCooldown(ctx, sector); err != nil { +func (m *Sealing) handleTerminateFailed(ctx statemachine.Context, si SectorInfo) error { + // ignoring error as it's most likely an API error - `pci` will be nil, and we'll go back to + // the Terminating state after cooldown. If the API is still failing, well get back to here + // with the error in SectorInfo log. + pci, _ := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + if pci != nil { + return nil // pause the fsm, needs manual user action + } + + if err := failedCooldown(ctx, si); err != nil { return err } diff --git a/extern/storage-sealing/states_proving.go b/extern/storage-sealing/states_proving.go index 90ecc3cbf..b24746d47 100644 --- a/extern/storage-sealing/states_proving.go +++ b/extern/storage-sealing/states_proving.go @@ -53,7 +53,15 @@ func (m *Sealing) handleTerminating(ctx statemachine.Context, sector SectorInfo) if si == nil { // either already terminated or not committed yet - // todo / edge case - may be in process of being committed, but let's call that really unlikely + + pci, err := m.api.StateSectorPreCommitInfo(ctx.Context(), m.maddr, si.SectorNumber, nil) + if err != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("checking precommit presence: %w", err)}) + } + if pci != nil { + return ctx.Send(SectorTerminateFailed{xerrors.Errorf("sector was precommitted but not proven, remove instead of terminating")}) + } + return ctx.Send(SectorRemove{}) } diff --git a/extern/storage-sealing/terminate_batch.go b/extern/storage-sealing/terminate_batch.go index 53eec2d82..2eae0b08f 100644 --- a/extern/storage-sealing/terminate_batch.go +++ b/extern/storage-sealing/terminate_batch.go @@ -98,105 +98,130 @@ func (b *TerminateBatcher) run() { forceRes = fr } - dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + var err error + lastMsg, err = b.processBatch(notif, after) if err != nil { - log.Errorw("TerminateBatcher: getting proving deadline info failed", "error", err) - continue + log.Warnw("TerminateBatcher processBatch error", "error", err) } - - b.lk.Lock() - params := miner2.TerminateSectorsParams{} - - var total uint64 - for loc, sectors := range b.todo { - n, err := sectors.Count() - if err != nil { - log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) - } - - // don't send terminations for currently challenged sectors - if loc.Deadline == dl.Index || (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { - continue - } - - if n < 1 { - log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) - continue - } - - total += n - - params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ - Deadline: loc.Deadline, - Partition: loc.Partition, - Sectors: *sectors, - }) - } - - if len(params.Terminations) == 0 { - b.lk.Unlock() - continue // nothing to do - } - - if notif && total < TerminateBatchMax { - b.lk.Unlock() - continue - } - - if after && total < TerminateBatchMin { - b.lk.Unlock() - continue - } - - enc := new(bytes.Buffer) - if err := params.MarshalCBOR(enc); err != nil { - log.Warnw("TerminateBatcher: couldn't serialize TerminateSectors params", "error", err) - b.lk.Unlock() - continue - } - - mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) - if err != nil { - log.Warnw("TerminateBatcher: couldn't get miner info", "error", err) - b.lk.Unlock() - continue - } - - from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) - if err != nil { - log.Warnw("TerminateBatcher: no good address found", "error", err) - b.lk.Unlock() - continue - } - - mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) - if err != nil { - log.Errorw("TerminateBatcher: sending message failed", "error", err) - b.lk.Unlock() - continue - } - lastMsg = &mcid - log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) - - for _, t := range params.Terminations { - delete(b.todo, SectorLocation{ - Deadline: t.Deadline, - Partition: t.Partition, - }) - } - - for _, w := range b.waiting { - for _, ch := range w { - ch <- mcid // buffered - } - } - - b.waiting = map[SectorLocation][]chan cid.Cid{} - - b.lk.Unlock() } } +func (b *TerminateBatcher) processBatch(notif, after bool) (*cid.Cid, error) { + dl, err := b.api.StateMinerProvingDeadline(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("getting proving deadline info failed: %w", err) + } + + b.lk.Lock() + defer b.lk.Unlock() + params := miner2.TerminateSectorsParams{} + + var total uint64 + for loc, sectors := range b.todo { + n, err := sectors.Count() + if err != nil { + log.Errorw("TerminateBatcher: failed to count sectors to terminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + // don't send terminations for currently challenged sectors + if loc.Deadline == (dl.Index+1)%miner.WPoStPeriodDeadlines || // not in next (in case the terminate message takes a while to get on chain) + loc.Deadline == dl.Index || // not in current + (loc.Deadline+1)%miner.WPoStPeriodDeadlines == dl.Index { // not in previous + continue + } + + if n < 1 { + log.Warnw("TerminateBatcher: zero sectors in bucket", "deadline", loc.Deadline, "partition", loc.Partition) + continue + } + + toTerminate, err := sectors.Copy() + if err != nil { + log.Warnw("TerminateBatcher: copy sectors bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + if total+n > uint64(miner.DeclarationsMax) { + n = uint64(miner.DeclarationsMax) - total + + toTerminate, err = toTerminate.Slice(0, n) + if err != nil { + log.Warnw("TerminateBatcher: slice toTerminate bitfield", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + + *sectors, err = bitfield.SubtractBitField(*sectors, toTerminate) + if err != nil { + log.Warnw("TerminateBatcher: sectors-toTerminate", "deadline", loc.Deadline, "partition", loc.Partition, "error", err) + continue + } + } + + total += n + + params.Terminations = append(params.Terminations, miner2.TerminationDeclaration{ + Deadline: loc.Deadline, + Partition: loc.Partition, + Sectors: toTerminate, + }) + + if total >= uint64(miner.DeclarationsMax) { + break + } + } + + if len(params.Terminations) == 0 { + return nil, nil // nothing to do + } + + if notif && total < TerminateBatchMax { + return nil, nil + } + + if after && total < TerminateBatchMin { + return nil, nil + } + + enc := new(bytes.Buffer) + if err := params.MarshalCBOR(enc); err != nil { + return nil, xerrors.Errorf("couldn't serialize TerminateSectors params: %w", err) + } + + mi, err := b.api.StateMinerInfo(b.mctx, b.maddr, nil) + if err != nil { + return nil, xerrors.Errorf("couldn't get miner info: %w", err) + } + + from, _, err := b.addrSel(b.mctx, mi, api.TerminateSectorsAddr, b.feeCfg.MaxTerminateGasFee, b.feeCfg.MaxTerminateGasFee) + if err != nil { + return nil, xerrors.Errorf("no good address found: %w", err) + } + + mcid, err := b.api.SendMsg(b.mctx, from, b.maddr, miner.Methods.TerminateSectors, big.Zero(), b.feeCfg.MaxTerminateGasFee, enc.Bytes()) + if err != nil { + return nil, xerrors.Errorf("sending message failed: %w", err) + } + log.Infow("Sent TerminateSectors message", "cid", mcid, "from", from, "terminations", len(params.Terminations)) + + for _, t := range params.Terminations { + delete(b.todo, SectorLocation{ + Deadline: t.Deadline, + Partition: t.Partition, + }) + } + + for _, w := range b.waiting { + for _, ch := range w { + ch <- mcid // buffered + } + } + + b.waiting = map[SectorLocation][]chan cid.Cid{} + + return &mcid, nil +} + // register termination, wait for batch message, return message CID func (b *TerminateBatcher) AddTermination(ctx context.Context, s abi.SectorID) (cid.Cid, error) { maddr, err := address.NewIDAddress(uint64(s.Miner))