diff --git a/extern/storage-sealing/checks.go b/extern/storage-sealing/checks.go index 95ef101fa..af62b9548 100644 --- a/extern/storage-sealing/checks.go +++ b/extern/storage-sealing/checks.go @@ -26,10 +26,12 @@ type ErrBadCommD struct{ error } type ErrExpiredTicket struct{ error } type ErrBadTicket struct{ error } type ErrPrecommitOnChain struct{ error } +type ErrSectorNumberAllocated struct{ error } type ErrBadSeed struct{ error } type ErrInvalidProof struct{ error } type ErrNoPrecommit struct{ error } +type ErrCommitWaitFailed struct{ error } func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error { tok, height, err := api.ChainHead(ctx) @@ -87,6 +89,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok) if err != nil { + if err == ErrSectorAllocated { + return &ErrSectorNumberAllocated{err} + } return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)} } @@ -106,6 +111,16 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte, } pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok) + if err == ErrSectorAllocated { + // not much more we can check here, basically try to wait for commit, + // and hope that this will work + + if si.CommitMessage != nil { + return &ErrCommitWaitFailed{err} + } + + return err + } if err != nil { return xerrors.Errorf("getting precommit info: %w", err) } diff --git a/extern/storage-sealing/fsm.go b/extern/storage-sealing/fsm.go index 4842e6023..d9648a99d 100644 --- a/extern/storage-sealing/fsm.go +++ b/extern/storage-sealing/fsm.go @@ -108,6 +108,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto on(SectorRetryPreCommitWait{}, PreCommitWait), on(SectorChainPreCommitFailed{}, PreCommitFailed), on(SectorRetryPreCommit{}, PreCommitting), + on(SectorRetryCommitWait{}, CommitWait), ), FinalizeFailed: planOne( on(SectorRetryFinalize{}, FinalizeSector), @@ -317,6 +318,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error { state.State = SealPreCommit1Failed case SectorCommitFailed: state.State = CommitFailed + case SectorRetryCommitWait: + state.State = CommitWait default: return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events) } diff --git a/extern/storage-sealing/fsm_events.go b/extern/storage-sealing/fsm_events.go index c4278991e..f270b3668 100644 --- a/extern/storage-sealing/fsm_events.go +++ b/extern/storage-sealing/fsm_events.go @@ -252,6 +252,10 @@ func (evt SectorRetryInvalidProof) apply(state *SectorInfo) { state.InvalidProofs++ } +type SectorRetryCommitWait struct{} + +func (evt SectorRetryCommitWait) apply(state *SectorInfo) {} + // Faults type SectorFaulty struct{} diff --git a/extern/storage-sealing/sealing.go b/extern/storage-sealing/sealing.go index 3ae75e189..3a6bb8e5f 100644 --- a/extern/storage-sealing/sealing.go +++ b/extern/storage-sealing/sealing.go @@ -2,6 +2,7 @@ package sealing import ( "context" + "errors" "io" "math" "sync" @@ -44,9 +45,14 @@ type Config struct { WaitDealsDelay time.Duration } +var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain") + type SealingAPI interface { StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error) + StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error) + + // Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error) StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error) StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error) @@ -121,6 +127,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds toUpgrade: map[abi.SectorNumber]struct{}{}, getConfig: gc, + + stats: SectorStats{ + bySector: map[abi.SectorID]statSectorState{}, + }, } s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{}) diff --git a/extern/storage-sealing/states_failed.go b/extern/storage-sealing/states_failed.go index e208a8cca..cf829f44f 100644 --- a/extern/storage-sealing/states_failed.go +++ b/extern/storage-sealing/states_failed.go @@ -85,6 +85,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI return ctx.Send(SectorRetryPreCommit{}) case *ErrPrecommitOnChain: // noop + case *ErrSectorNumberAllocated: + log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) + // TODO: check if the sector is committed (not sure how we'd end up here) + return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) } @@ -158,6 +162,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)}) case *ErrPrecommitOnChain: // noop, this is expected + case *ErrSectorNumberAllocated: + // noop, already committed? default: return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err) } @@ -186,6 +192,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo return ctx.Send(SectorRetryPreCommitWait{}) case *ErrNoPrecommit: return ctx.Send(SectorRetryPreCommit{}) + case *ErrCommitWaitFailed: + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetryCommitWait{}) default: return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err) } diff --git a/extern/storage-sealing/states_sealing.go b/extern/storage-sealing/states_sealing.go index 2178ce0b4..110a73ac0 100644 --- a/extern/storage-sealing/states_sealing.go +++ b/extern/storage-sealing/states_sealing.go @@ -149,6 +149,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)}) case *ErrPrecommitOnChain: return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit + case *ErrSectorNumberAllocated: + log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err) + // TODO: check if the sector is committed (not sure how we'd end up here) + return nil default: return xerrors.Errorf("checkPrecommit sanity check error: %w", err) } @@ -275,6 +279,20 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er } func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error { + if sector.CommitMessage != nil { + log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber) + + ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage) + if err != nil { + log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err) + } + + if ml != nil { + // some weird retry paths can lead here + return ctx.Send(SectorRetryCommitWait{}) + } + } + log.Info("scheduling seal proof computation...") log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD) diff --git a/storage/adapter_storage_miner.go b/storage/adapter_storage_miner.go index 8881e599e..1890a369f 100644 --- a/storage/adapter_storage_miner.go +++ b/storage/adapter_storage_miner.go @@ -108,6 +108,27 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal }, nil } +func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing.MsgLookup, error) { + wmsg, err := s.delegate.StateSearchMsg(ctx, c) + if err != nil { + return nil, err + } + + if wmsg == nil { + return nil, nil + } + + return &sealing.MsgLookup{ + Receipt: sealing.MessageReceipt{ + ExitCode: wmsg.Receipt.ExitCode, + Return: wmsg.Receipt.Return, + GasUsed: wmsg.Receipt.GasUsed, + }, + TipSetTok: wmsg.TipSet.Bytes(), + Height: wmsg.Height, + }, nil +} + func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) { tsk, err := types.TipSetKeyFromBytes(tok) if err != nil { @@ -186,7 +207,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a return nil, xerrors.Errorf("checking if sector is allocated: %w", err) } if set { - return nil, xerrors.Errorf("sectorNumber is allocated") + return nil, sealing.ErrSectorAllocated } return nil, nil diff --git a/storage/miner.go b/storage/miner.go index 803556cdf..7baffee30 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -60,6 +60,7 @@ type storageMinerApi interface { StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error) StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error) + StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error) StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)