fsm: Implement handlers for Commit errors
This commit is contained in:
parent
a63a0b3077
commit
6284fad33e
@ -35,7 +35,7 @@ const (
|
|||||||
FailedUnrecoverable SectorState = "FailedUnrecoverable"
|
FailedUnrecoverable SectorState = "FailedUnrecoverable"
|
||||||
SealFailed SectorState = "SealFailed"
|
SealFailed SectorState = "SealFailed"
|
||||||
PreCommitFailed SectorState = "PreCommitFailed"
|
PreCommitFailed SectorState = "PreCommitFailed"
|
||||||
SealCommitFailed SectorState = "SealCommitFailed"
|
ComputeProofFailed SectorState = "ComputeProofFailed"
|
||||||
CommitFailed SectorState = "CommitFailed"
|
CommitFailed SectorState = "CommitFailed"
|
||||||
PackingFailed SectorState = "PackingFailed"
|
PackingFailed SectorState = "PackingFailed"
|
||||||
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
|
Faulty SectorState = "Faulty" // sector is corrupted or gone for some reason
|
||||||
|
@ -60,10 +60,10 @@ const FallbackPoStConfidence = 6
|
|||||||
const SealRandomnessLookback = Finality
|
const SealRandomnessLookback = Finality
|
||||||
|
|
||||||
// Epochs
|
// Epochs
|
||||||
const SealRandomnessLookbackLimit = SealRandomnessLookback + 2000
|
const SealRandomnessLookbackLimit = SealRandomnessLookback + 2000 // TODO: Get from spec specs-actors
|
||||||
|
|
||||||
// Maximum lookback that randomness can be sourced from for a seal proof submission
|
// Maximum lookback that randomness can be sourced from for a seal proof submission
|
||||||
const MaxSealLookback = SealRandomnessLookbackLimit + 2000
|
const MaxSealLookback = SealRandomnessLookbackLimit + 2000 // TODO: Get from specs-actors
|
||||||
|
|
||||||
// /////
|
// /////
|
||||||
// Mining
|
// Mining
|
||||||
|
@ -11,6 +11,7 @@ import (
|
|||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin"
|
"github.com/filecoin-project/specs-actors/actors/builtin"
|
||||||
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
"github.com/filecoin-project/specs-actors/actors/builtin/market"
|
||||||
|
"github.com/filecoin-project/specs-actors/actors/crypto"
|
||||||
|
|
||||||
"github.com/filecoin-project/lotus/build"
|
"github.com/filecoin-project/lotus/build"
|
||||||
"github.com/filecoin-project/lotus/chain/actors"
|
"github.com/filecoin-project/lotus/chain/actors"
|
||||||
@ -29,6 +30,8 @@ type ErrExpiredDeals struct{ error }
|
|||||||
type ErrBadCommD struct{ error }
|
type ErrBadCommD struct{ error }
|
||||||
type ErrExpiredTicket struct{ error }
|
type ErrExpiredTicket struct{ error }
|
||||||
|
|
||||||
|
type ErrBadSeed struct{ error }
|
||||||
|
|
||||||
// checkPieces validates that:
|
// checkPieces validates that:
|
||||||
// - Each piece han a corresponding on chain deal
|
// - Each piece han a corresponding on chain deal
|
||||||
// - Piece commitments match with on chain deals
|
// - Piece commitments match with on chain deals
|
||||||
@ -69,9 +72,9 @@ func checkPieces(ctx context.Context, si SectorInfo, api sealingApi) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkSeal checks that data commitment generated in the sealing process
|
// checkPrecommit checks that data commitment generated in the sealing process
|
||||||
// matches pieces, and that the seal ticket isn't expired
|
// matches pieces, and that the seal ticket isn't expired
|
||||||
func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) {
|
func checkPrecommit(ctx context.Context, maddr address.Address, si SectorInfo, api sealingApi) (err error) {
|
||||||
head, err := api.ChainHead(ctx)
|
head, err := api.ChainHead(ctx)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
|
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
|
||||||
@ -116,5 +119,26 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se
|
|||||||
}
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkCommit(ctx context.Context, si SectorInfo, api sealingApi) (err error) {
|
||||||
|
head, err := api.ChainHead(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return &ErrApi{xerrors.Errorf("getting chain head: %w", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
if si.Seed.Epoch == 0 {
|
||||||
|
return &ErrBadSeed{xerrors.Errorf("seed epoch was not set")}
|
||||||
|
}
|
||||||
|
|
||||||
|
rand, err := api.ChainGetRandomness(ctx, head.Key(), crypto.DomainSeparationTag_InteractiveSealChallengeSeed, si.Seed.Epoch, nil)
|
||||||
|
if err != nil {
|
||||||
|
return &ErrApi{xerrors.Errorf("failed to get randomness for computing seal proof: %w", err)}
|
||||||
|
}
|
||||||
|
|
||||||
|
if string(rand) != string(si.Seed.Value) {
|
||||||
|
return &ErrBadSeed{xerrors.Errorf("seed has changed")}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
@ -77,6 +77,14 @@ var fsmPlanners = map[api.SectorState]func(events []statemachine.Event, state *S
|
|||||||
on(SectorRetryWaitSeed{}, api.WaitSeed),
|
on(SectorRetryWaitSeed{}, api.WaitSeed),
|
||||||
on(SectorSealPreCommitFailed{}, api.SealFailed),
|
on(SectorSealPreCommitFailed{}, api.SealFailed),
|
||||||
),
|
),
|
||||||
|
api.ComputeProofFailed: planOne(
|
||||||
|
on(SectorRetryComputeProof{}, api.Committing),
|
||||||
|
),
|
||||||
|
api.CommitFailed: planOne(
|
||||||
|
on(SectorSealPreCommitFailed{}, api.SealFailed),
|
||||||
|
on(SectorRetryWaitSeed{}, api.WaitSeed),
|
||||||
|
on(SectorRetryComputeProof{}, api.Committing),
|
||||||
|
),
|
||||||
|
|
||||||
api.Faulty: planOne(
|
api.Faulty: planOne(
|
||||||
on(SectorFaultReported{}, api.FaultReported),
|
on(SectorFaultReported{}, api.FaultReported),
|
||||||
@ -129,15 +137,20 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
| |
|
| |
|
||||||
| v
|
| v
|
||||||
*<- PreCommit1 <--> SealFailed
|
*<- PreCommit1 <--> SealFailed
|
||||||
| |
|
| | ^^^
|
||||||
| v
|
| v |||
|
||||||
* PreCommitting <--> PreCommitFailed
|
*<- PreCommit2 -------/||
|
||||||
| | ^
|
| | ||
|
||||||
| v |
|
| v /-------/|
|
||||||
*<- WaitSeed ----------/
|
* PreCommitting <-----+---> PreCommitFailed
|
||||||
| |||
|
| | | ^
|
||||||
| vvv v--> SealCommitFailed
|
| v | |
|
||||||
*<- Committing
|
*<- WaitSeed -----------+-----/
|
||||||
|
| ||| ^ |
|
||||||
|
| ||| \--------*-----/
|
||||||
|
| ||| |
|
||||||
|
| vvv v----+----> ComputeProofFailed
|
||||||
|
*<- Committing |
|
||||||
| | ^--> CommitFailed
|
| | ^--> CommitFailed
|
||||||
| v ^
|
| v ^
|
||||||
*<- CommitWait ---/
|
*<- CommitWait ---/
|
||||||
@ -181,10 +194,10 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
return m.handleSealFailed, nil
|
return m.handleSealFailed, nil
|
||||||
case api.PreCommitFailed:
|
case api.PreCommitFailed:
|
||||||
return m.handlePreCommitFailed, nil
|
return m.handlePreCommitFailed, nil
|
||||||
case api.SealCommitFailed:
|
case api.ComputeProofFailed:
|
||||||
log.Warnf("sector %d entered unimplemented state 'SealCommitFailed'", state.SectorID)
|
return m.handleComputeProofFailed, nil
|
||||||
case api.CommitFailed:
|
case api.CommitFailed:
|
||||||
log.Warnf("sector %d entered unimplemented state 'CommitFailed'", state.SectorID)
|
return m.handleCommitFailed, nil
|
||||||
|
|
||||||
// Faults
|
// Faults
|
||||||
case api.Faulty:
|
case api.Faulty:
|
||||||
@ -224,7 +237,7 @@ func planCommitting(events []statemachine.Event, state *SectorInfo) error {
|
|||||||
state.State = api.Committing
|
state.State = api.Committing
|
||||||
return nil
|
return nil
|
||||||
case SectorComputeProofFailed:
|
case SectorComputeProofFailed:
|
||||||
state.State = api.SealCommitFailed
|
state.State = api.ComputeProofFailed
|
||||||
case SectorSealPreCommitFailed:
|
case SectorSealPreCommitFailed:
|
||||||
state.State = api.CommitFailed
|
state.State = api.CommitFailed
|
||||||
case SectorCommitFailed:
|
case SectorCommitFailed:
|
||||||
|
@ -166,6 +166,10 @@ type SectorRetryWaitSeed struct{}
|
|||||||
|
|
||||||
func (evt SectorRetryWaitSeed) apply(state *SectorInfo) {}
|
func (evt SectorRetryWaitSeed) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
type SectorRetryComputeProof struct{}
|
||||||
|
|
||||||
|
func (evt SectorRetryComputeProof) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
// Faults
|
// Faults
|
||||||
|
|
||||||
type SectorFaulty struct{}
|
type SectorFaulty struct{}
|
||||||
|
@ -94,7 +94,7 @@ func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case *ErrApi:
|
case *ErrApi:
|
||||||
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
||||||
@ -104,7 +104,7 @@ func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInf
|
|||||||
case *ErrExpiredTicket:
|
case *ErrExpiredTicket:
|
||||||
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)})
|
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired: %w", err)})
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkSeal sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,6 +17,8 @@ import (
|
|||||||
const minRetryTime = 1 * time.Minute
|
const minRetryTime = 1 * time.Minute
|
||||||
|
|
||||||
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: Exponential backoff when we see consecutive failures
|
||||||
|
|
||||||
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
|
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
|
||||||
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
||||||
log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorID, time.Until(retryStart))
|
log.Infof("%s(%d), waiting %s before retrying", sector.State, sector.SectorID, time.Until(retryStart))
|
||||||
@ -74,7 +76,7 @@ func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
case *ErrApi:
|
case *ErrApi:
|
||||||
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
|
log.Errorf("handlePreCommitFailed: api error, not proceeding: %+v", err)
|
||||||
@ -84,7 +86,7 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
|
|||||||
case *ErrExpiredTicket:
|
case *ErrExpiredTicket:
|
||||||
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)})
|
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)})
|
||||||
default:
|
default:
|
||||||
return xerrors.Errorf("checkSeal sanity check error: %w", err)
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,3 +121,50 @@ func (m *Sealing) handlePreCommitFailed(ctx statemachine.Context, sector SectorI
|
|||||||
|
|
||||||
return ctx.Send(SectorRetryPreCommit{})
|
return ctx.Send(SectorRetryPreCommit{})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleComputeProofFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: Check sector files
|
||||||
|
|
||||||
|
if err := failedCooldown(ctx, sector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetryComputeProof{})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleCommitFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
if err := checkPrecommit(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case *ErrApi:
|
||||||
|
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
case *ErrBadCommD:
|
||||||
|
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||||
|
case *ErrExpiredTicket:
|
||||||
|
return ctx.Send(SectorSealPreCommitFailed{xerrors.Errorf("ticket expired error: %w", err)})
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("checkPrecommit sanity check error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := checkCommit(ctx.Context(), sector, m.api); err != nil {
|
||||||
|
switch err.(type) {
|
||||||
|
case *ErrApi:
|
||||||
|
log.Errorf("handleCommitFailed: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
case *ErrBadSeed:
|
||||||
|
log.Errorf("seed changed, will retry: %+v", err)
|
||||||
|
return ctx.Send(SectorRetryWaitSeed{})
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("checkCommit sanity check error: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Check sector files
|
||||||
|
|
||||||
|
if err := failedCooldown(ctx, sector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetryComputeProof{})
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user