fsm: handle already-precommitted CommitFailed sectors correctly
This commit is contained in:
parent
6a49bd6d8e
commit
886d9cd5eb
15
extern/storage-sealing/checks.go
vendored
15
extern/storage-sealing/checks.go
vendored
@ -26,10 +26,12 @@ type ErrBadCommD struct{ error }
|
|||||||
type ErrExpiredTicket struct{ error }
|
type ErrExpiredTicket struct{ error }
|
||||||
type ErrBadTicket struct{ error }
|
type ErrBadTicket struct{ error }
|
||||||
type ErrPrecommitOnChain struct{ error }
|
type ErrPrecommitOnChain struct{ error }
|
||||||
|
type ErrSectorNumberAllocated struct{ error }
|
||||||
|
|
||||||
type ErrBadSeed struct{ error }
|
type ErrBadSeed struct{ error }
|
||||||
type ErrInvalidProof struct{ error }
|
type ErrInvalidProof struct{ error }
|
||||||
type ErrNoPrecommit struct{ error }
|
type ErrNoPrecommit struct{ error }
|
||||||
|
type ErrCommitWaitFailed struct{ error }
|
||||||
|
|
||||||
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
|
func checkPieces(ctx context.Context, si SectorInfo, api SealingAPI) error {
|
||||||
tok, height, err := api.ChainHead(ctx)
|
tok, height, err := api.ChainHead(ctx)
|
||||||
@ -87,6 +89,9 @@ func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, t
|
|||||||
|
|
||||||
pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok)
|
pci, err := api.StateSectorPreCommitInfo(ctx, maddr, si.SectorNumber, tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if err == ErrSectorAllocated {
|
||||||
|
return &ErrSectorNumberAllocated{err}
|
||||||
|
}
|
||||||
return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)}
|
return &ErrApi{xerrors.Errorf("getting precommit info: %w", err)}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -106,6 +111,16 @@ func (m *Sealing) checkCommit(ctx context.Context, si SectorInfo, proof []byte,
|
|||||||
}
|
}
|
||||||
|
|
||||||
pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
|
pci, err := m.api.StateSectorPreCommitInfo(ctx, m.maddr, si.SectorNumber, tok)
|
||||||
|
if err == ErrSectorAllocated {
|
||||||
|
// not much more we can check here, basically try to wait for commit,
|
||||||
|
// and hope that this will work
|
||||||
|
|
||||||
|
if si.CommitMessage != nil {
|
||||||
|
return &ErrCommitWaitFailed{err}
|
||||||
|
}
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("getting precommit info: %w", err)
|
return xerrors.Errorf("getting precommit info: %w", err)
|
||||||
}
|
}
|
||||||
|
3
extern/storage-sealing/fsm.go
vendored
3
extern/storage-sealing/fsm.go
vendored
@ -108,6 +108,7 @@ var fsmPlanners = map[SectorState]func(events []statemachine.Event, state *Secto
|
|||||||
on(SectorRetryPreCommitWait{}, PreCommitWait),
|
on(SectorRetryPreCommitWait{}, PreCommitWait),
|
||||||
on(SectorChainPreCommitFailed{}, PreCommitFailed),
|
on(SectorChainPreCommitFailed{}, PreCommitFailed),
|
||||||
on(SectorRetryPreCommit{}, PreCommitting),
|
on(SectorRetryPreCommit{}, PreCommitting),
|
||||||
|
on(SectorRetryCommitWait{}, CommitWait),
|
||||||
),
|
),
|
||||||
FinalizeFailed: planOne(
|
FinalizeFailed: planOne(
|
||||||
on(SectorRetryFinalize{}, FinalizeSector),
|
on(SectorRetryFinalize{}, FinalizeSector),
|
||||||
@ -317,6 +318,8 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
|||||||
state.State = SealPreCommit1Failed
|
state.State = SealPreCommit1Failed
|
||||||
case SectorCommitFailed:
|
case SectorCommitFailed:
|
||||||
state.State = CommitFailed
|
state.State = CommitFailed
|
||||||
|
case SectorRetryCommitWait:
|
||||||
|
state.State = CommitWait
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
return xerrors.Errorf("planCommitting got event of unknown type %T, events: %+v", event.User, events)
|
||||||
}
|
}
|
||||||
|
4
extern/storage-sealing/fsm_events.go
vendored
4
extern/storage-sealing/fsm_events.go
vendored
@ -252,6 +252,10 @@ func (evt SectorRetryInvalidProof) apply(state *SectorInfo) {
|
|||||||
state.InvalidProofs++
|
state.InvalidProofs++
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type SectorRetryCommitWait struct{}
|
||||||
|
|
||||||
|
func (evt SectorRetryCommitWait) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
// Faults
|
// Faults
|
||||||
|
|
||||||
type SectorFaulty struct{}
|
type SectorFaulty struct{}
|
||||||
|
10
extern/storage-sealing/sealing.go
vendored
10
extern/storage-sealing/sealing.go
vendored
@ -2,6 +2,7 @@ package sealing
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
@ -44,9 +45,14 @@ type Config struct {
|
|||||||
WaitDealsDelay time.Duration
|
WaitDealsDelay time.Duration
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var ErrSectorAllocated = errors.New("sectorNumber is allocated, but PreCommit info wasn't found on chain")
|
||||||
|
|
||||||
type SealingAPI interface {
|
type SealingAPI interface {
|
||||||
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
|
StateWaitMsg(context.Context, cid.Cid) (MsgLookup, error)
|
||||||
|
StateSearchMsg(context.Context, cid.Cid) (*MsgLookup, error)
|
||||||
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
|
StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok TipSetToken) (cid.Cid, error)
|
||||||
|
|
||||||
|
// Can return ErrSectorAllocated in case precommit info wasn't found, but the sector number is marked as allocated
|
||||||
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
StateSectorPreCommitInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorPreCommitOnChainInfo, error)
|
||||||
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
|
StateSectorGetInfo(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*miner.SectorOnChainInfo, error)
|
||||||
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
StateSectorPartition(ctx context.Context, maddr address.Address, sectorNumber abi.SectorNumber, tok TipSetToken) (*SectorLocation, error)
|
||||||
@ -121,6 +127,10 @@ func New(api SealingAPI, fc FeeConfig, events Events, maddr address.Address, ds
|
|||||||
|
|
||||||
toUpgrade: map[abi.SectorNumber]struct{}{},
|
toUpgrade: map[abi.SectorNumber]struct{}{},
|
||||||
getConfig: gc,
|
getConfig: gc,
|
||||||
|
|
||||||
|
stats: SectorStats{
|
||||||
|
bySector: map[abi.SectorID]statSectorState{},
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
s.sectors = statemachine.New(namespace.Wrap(ds, datastore.NewKey(SectorStorePrefix)), s, SectorInfo{})
|
||||||
|
12
extern/storage-sealing/states_failed.go
vendored
12
extern/storage-sealing/states_failed.go
vendored
@ -85,6 +85,10 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
|
|||||||
return ctx.Send(SectorRetryPreCommit{})
|
return ctx.Send(SectorRetryPreCommit{})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
// noop
|
// noop
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
|
||||||
|
// TODO: check if the sector is committed (not sure how we'd end up here)
|
||||||
|
return nil
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
@ -158,6 +162,8 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
|||||||
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
return ctx.Send(SectorChainPreCommitFailed{xerrors.Errorf("no precommit: %w", err)})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
// noop, this is expected
|
// noop, this is expected
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
// noop, already committed?
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
return xerrors.Errorf("checkPrecommit sanity check error (%T): %w", err, err)
|
||||||
}
|
}
|
||||||
@ -186,6 +192,12 @@ func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo
|
|||||||
return ctx.Send(SectorRetryPreCommitWait{})
|
return ctx.Send(SectorRetryPreCommitWait{})
|
||||||
case *ErrNoPrecommit:
|
case *ErrNoPrecommit:
|
||||||
return ctx.Send(SectorRetryPreCommit{})
|
return ctx.Send(SectorRetryPreCommit{})
|
||||||
|
case *ErrCommitWaitFailed:
|
||||||
|
if err := failedCooldown(ctx, sector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetryCommitWait{})
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
|
return xerrors.Errorf("checkCommit sanity check error (%T): %w", err, err)
|
||||||
}
|
}
|
||||||
|
18
extern/storage-sealing/states_sealing.go
vendored
18
extern/storage-sealing/states_sealing.go
vendored
@ -149,6 +149,10 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("bad ticket: %w", err)})
|
||||||
case *ErrPrecommitOnChain:
|
case *ErrPrecommitOnChain:
|
||||||
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
|
return ctx.Send(SectorPreCommitLanded{TipSet: tok}) // we re-did precommit
|
||||||
|
case *ErrSectorNumberAllocated:
|
||||||
|
log.Errorf("handlePreCommitFailed: sector number already allocated, not proceeding: %+v", err)
|
||||||
|
// TODO: check if the sector is committed (not sure how we'd end up here)
|
||||||
|
return nil
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
@ -275,6 +279,20 @@ func (m *Sealing) handleWaitSeed(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if sector.CommitMessage != nil {
|
||||||
|
log.Warnf("sector %d entered committing state with a commit message cid", sector.SectorNumber)
|
||||||
|
|
||||||
|
ml, err := m.api.StateSearchMsg(ctx.Context(), *sector.CommitMessage)
|
||||||
|
if err != nil {
|
||||||
|
log.Warnf("sector %d searching existing commit message %s: %+v", sector.SectorNumber, *sector.CommitMessage, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ml != nil {
|
||||||
|
// some weird retry paths can lead here
|
||||||
|
return ctx.Send(SectorRetryCommitWait{})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
log.Info("scheduling seal proof computation...")
|
log.Info("scheduling seal proof computation...")
|
||||||
|
|
||||||
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
log.Infof("KOMIT %d %x(%d); %x(%d); %v; r:%x; d:%x", sector.SectorNumber, sector.TicketValue, sector.TicketEpoch, sector.SeedValue, sector.SeedEpoch, sector.pieceInfos(), sector.CommR, sector.CommD)
|
||||||
|
@ -108,6 +108,27 @@ func (s SealingAPIAdapter) StateWaitMsg(ctx context.Context, mcid cid.Cid) (seal
|
|||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (s SealingAPIAdapter) StateSearchMsg(ctx context.Context, c cid.Cid) (*sealing.MsgLookup, error) {
|
||||||
|
wmsg, err := s.delegate.StateSearchMsg(ctx, c)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if wmsg == nil {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return &sealing.MsgLookup{
|
||||||
|
Receipt: sealing.MessageReceipt{
|
||||||
|
ExitCode: wmsg.Receipt.ExitCode,
|
||||||
|
Return: wmsg.Receipt.Return,
|
||||||
|
GasUsed: wmsg.Receipt.GasUsed,
|
||||||
|
},
|
||||||
|
TipSetTok: wmsg.TipSet.Bytes(),
|
||||||
|
Height: wmsg.Height,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) {
|
func (s SealingAPIAdapter) StateComputeDataCommitment(ctx context.Context, maddr address.Address, sectorType abi.RegisteredSealProof, deals []abi.DealID, tok sealing.TipSetToken) (cid.Cid, error) {
|
||||||
tsk, err := types.TipSetKeyFromBytes(tok)
|
tsk, err := types.TipSetKeyFromBytes(tok)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -186,7 +207,7 @@ func (s SealingAPIAdapter) StateSectorPreCommitInfo(ctx context.Context, maddr a
|
|||||||
return nil, xerrors.Errorf("checking if sector is allocated: %w", err)
|
return nil, xerrors.Errorf("checking if sector is allocated: %w", err)
|
||||||
}
|
}
|
||||||
if set {
|
if set {
|
||||||
return nil, xerrors.Errorf("sectorNumber is allocated")
|
return nil, sealing.ErrSectorAllocated
|
||||||
}
|
}
|
||||||
|
|
||||||
return nil, nil
|
return nil, nil
|
||||||
|
@ -60,6 +60,7 @@ type storageMinerApi interface {
|
|||||||
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
StateMinerProvingDeadline(context.Context, address.Address, types.TipSetKey) (*miner.DeadlineInfo, error)
|
||||||
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
StateMinerPreCommitDepositForPower(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
||||||
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
StateMinerInitialPledgeCollateral(context.Context, address.Address, miner.SectorPreCommitInfo, types.TipSetKey) (types.BigInt, error)
|
||||||
|
StateSearchMsg(context.Context, cid.Cid) (*api.MsgLookup, error)
|
||||||
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually
|
StateWaitMsg(ctx context.Context, cid cid.Cid, confidence uint64) (*api.MsgLookup, error) // TODO: removeme eventually
|
||||||
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
StateGetActor(ctx context.Context, actor address.Address, ts types.TipSetKey) (*types.Actor, error)
|
||||||
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
StateGetReceipt(context.Context, cid.Cid, types.TipSetKey) (*types.MessageReceipt, error)
|
||||||
|
Loading…
Reference in New Issue
Block a user