sealing: implement handler for sealFailed
This commit is contained in:
parent
3ec83f2318
commit
2715ff763f
@ -12,6 +12,8 @@ import (
|
|||||||
"github.com/filecoin-project/lotus/chain/types"
|
"github.com/filecoin-project/lotus/chain/types"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting
|
||||||
|
// We should implement some wait-for-api logic
|
||||||
type ErrApi error
|
type ErrApi error
|
||||||
|
|
||||||
type ErrInvalidDeals error
|
type ErrInvalidDeals error
|
||||||
@ -78,7 +80,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se
|
|||||||
}
|
}
|
||||||
r, err := api.StateCall(ctx, ccmt, nil)
|
r, err := api.StateCall(ctx, ccmt, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return xerrors.Errorf("calling ComputeDataCommitment: %w", err)
|
return ErrApi(xerrors.Errorf("calling ComputeDataCommitment: %w", err))
|
||||||
}
|
}
|
||||||
if r.ExitCode != 0 {
|
if r.ExitCode != 0 {
|
||||||
return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode))
|
return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode))
|
||||||
|
6
fsm.go
6
fsm.go
@ -58,6 +58,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
|
|||||||
on(SectorFaulty{}, api.Faulty),
|
on(SectorFaulty{}, api.Faulty),
|
||||||
),
|
),
|
||||||
|
|
||||||
|
api.SealFailed: planOne(
|
||||||
|
on(SectorRetrySeal{}, api.Unsealed),
|
||||||
|
),
|
||||||
|
|
||||||
api.Faulty: planOne(
|
api.Faulty: planOne(
|
||||||
on(SectorFaultReported{}, api.FaultReported),
|
on(SectorFaultReported{}, api.FaultReported),
|
||||||
),
|
),
|
||||||
@ -84,7 +88,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
|||||||
|
|
||||||
p := fsmPlanners[state.State]
|
p := fsmPlanners[state.State]
|
||||||
if p == nil {
|
if p == nil {
|
||||||
return nil, xerrors.Errorf("planner for state %d not found", state.State)
|
return nil, xerrors.Errorf("planner for state %s not found", state.State, api.SectorStates[state.State])
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := p(events, state); err != nil {
|
if err := p(events, state); err != nil {
|
||||||
|
@ -120,6 +120,14 @@ type SectorProving struct{}
|
|||||||
|
|
||||||
func (evt SectorProving) apply(*SectorInfo) {}
|
func (evt SectorProving) apply(*SectorInfo) {}
|
||||||
|
|
||||||
|
// Failed state recovery
|
||||||
|
|
||||||
|
type SectorRetrySeal struct{}
|
||||||
|
|
||||||
|
func (evt SectorRetrySeal) apply(state *SectorInfo) {}
|
||||||
|
|
||||||
|
// Faults
|
||||||
|
|
||||||
type SectorFaulty struct{}
|
type SectorFaulty struct{}
|
||||||
|
|
||||||
func (evt SectorFaulty) apply(state *SectorInfo) {}
|
func (evt SectorFaulty) apply(state *SectorInfo) {}
|
||||||
|
@ -39,6 +39,7 @@ type sealingApi interface { // TODO: trim down
|
|||||||
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||||
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||||
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
|
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
|
||||||
|
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error)
|
||||||
|
|
||||||
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||||
|
|
||||||
|
24
states.go
24
states.go
@ -45,7 +45,17 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
|||||||
|
|
||||||
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
|
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
|
||||||
return ctx.Send(SectorPackingFailed{xerrors.Errorf("checkPieces error: %w", err)})
|
switch err.(type) {
|
||||||
|
case ErrApi:
|
||||||
|
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
case ErrInvalidDeals:
|
||||||
|
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
|
||||||
|
case ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
|
||||||
|
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)})
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("checkPieces sanity check error: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||||
@ -71,7 +81,17 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
|
|||||||
|
|
||||||
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
||||||
return ctx.Send(SectorSealFailed{xerrors.Errorf("checkPieces error: %w", err)})
|
switch err.(type) {
|
||||||
|
case ErrApi:
|
||||||
|
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
||||||
|
return nil
|
||||||
|
case ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
|
||||||
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||||
|
case ErrExpiredTicket:
|
||||||
|
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||||
|
default:
|
||||||
|
return xerrors.Errorf("checkSeal sanity check error: %w", err)
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
params := &actors.SectorPreCommitInfo{
|
params := &actors.SectorPreCommitInfo{
|
||||||
|
55
states_failed.go
Normal file
55
states_failed.go
Normal file
@ -0,0 +1,55 @@
|
|||||||
|
package sealing
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/api"
|
||||||
|
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||||
|
)
|
||||||
|
|
||||||
|
const minRetryTime = 1 * time.Minute
|
||||||
|
|
||||||
|
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
|
||||||
|
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
||||||
|
log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], time.Until(retryStart))
|
||||||
|
select {
|
||||||
|
case <-time.After(time.Until(retryStart)):
|
||||||
|
case <-ctx.Context().Done():
|
||||||
|
return ctx.Context().Err()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
|
// TODO: <Remove this section after we can re-precommit>
|
||||||
|
|
||||||
|
act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
st, err := m.api.StateReadState(ctx.Context(), act, nil)
|
||||||
|
if err != nil {
|
||||||
|
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
_, found := st.State.(map[string]interface{})["PreCommittedSectors"].(map[string]interface{})[fmt.Sprint(sector.SectorID)]
|
||||||
|
if found {
|
||||||
|
// TODO: If not expired yet, we can just try reusing sealticket
|
||||||
|
log.Errorf("sector found in miner preseal array: %+v", sector.SectorID, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
// </>
|
||||||
|
|
||||||
|
if err := failedCooldown(ctx, sector); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return ctx.Send(SectorRetrySeal{})
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user