sealing: implement handler for sealFailed
This commit is contained in:
parent
f5540195de
commit
94f2948020
@ -70,6 +70,7 @@ var SectorStates = []string{
|
||||
PreCommitFailed: "PreCommitFailed",
|
||||
SealCommitFailed: "SealCommitFailed",
|
||||
CommitFailed: "CommitFailed",
|
||||
PackingFailed: "PackingFailed",
|
||||
|
||||
FailedUnrecoverable: "FailedUnrecoverable",
|
||||
|
||||
|
@ -14,6 +14,7 @@ import (
|
||||
|
||||
"github.com/filecoin-project/lotus/lib/tarutil"
|
||||
)
|
||||
|
||||
func (w *worker) sizeForType(typ string) int64 {
|
||||
size := int64(w.sb.SectorSize())
|
||||
if typ == "cache" {
|
||||
|
@ -50,6 +50,7 @@ type storageMinerApi interface {
|
||||
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
|
||||
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error)
|
||||
|
||||
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||
|
||||
|
@ -12,6 +12,8 @@ import (
|
||||
"github.com/filecoin-project/lotus/chain/types"
|
||||
)
|
||||
|
||||
// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting
|
||||
// We should implement some wait-for-api logic
|
||||
type ErrApi error
|
||||
|
||||
type ErrInvalidDeals error
|
||||
@ -78,7 +80,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se
|
||||
}
|
||||
r, err := api.StateCall(ctx, ccmt, nil)
|
||||
if err != nil {
|
||||
return xerrors.Errorf("calling ComputeDataCommitment: %w", err)
|
||||
return ErrApi(xerrors.Errorf("calling ComputeDataCommitment: %w", err))
|
||||
}
|
||||
if r.ExitCode != 0 {
|
||||
return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode))
|
||||
|
@ -58,6 +58,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{
|
||||
on(SectorFaulty{}, api.Faulty),
|
||||
),
|
||||
|
||||
api.SealFailed: planOne(
|
||||
on(SectorRetrySeal{}, api.Unsealed),
|
||||
),
|
||||
|
||||
api.Faulty: planOne(
|
||||
on(SectorFaultReported{}, api.FaultReported),
|
||||
),
|
||||
@ -84,7 +88,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta
|
||||
|
||||
p := fsmPlanners[state.State]
|
||||
if p == nil {
|
||||
return nil, xerrors.Errorf("planner for state %d not found", state.State)
|
||||
return nil, xerrors.Errorf("planner for state %s not found", state.State, api.SectorStates[state.State])
|
||||
}
|
||||
|
||||
if err := p(events, state); err != nil {
|
||||
|
@ -120,6 +120,14 @@ type SectorProving struct{}
|
||||
|
||||
func (evt SectorProving) apply(*SectorInfo) {}
|
||||
|
||||
// Failed state recovery
|
||||
|
||||
type SectorRetrySeal struct{}
|
||||
|
||||
func (evt SectorRetrySeal) apply(state *SectorInfo) {}
|
||||
|
||||
// Faults
|
||||
|
||||
type SectorFaulty struct{}
|
||||
|
||||
func (evt SectorFaulty) apply(state *SectorInfo) {}
|
||||
|
@ -39,6 +39,7 @@ type sealingApi interface { // TODO: trim down
|
||||
StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error)
|
||||
StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error)
|
||||
StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error)
|
||||
StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error)
|
||||
|
||||
MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error)
|
||||
|
||||
|
@ -45,7 +45,17 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err
|
||||
|
||||
func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state
|
||||
return ctx.Send(SectorPackingFailed{xerrors.Errorf("checkPieces error: %w", err)})
|
||||
switch err.(type) {
|
||||
case ErrApi:
|
||||
log.Errorf("handleUnsealed: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
case ErrInvalidDeals:
|
||||
return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)})
|
||||
case ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector?
|
||||
return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)})
|
||||
default:
|
||||
return xerrors.Errorf("checkPieces sanity check error: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
log.Infow("performing sector replication...", "sector", sector.SectorID)
|
||||
@ -71,7 +81,17 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er
|
||||
|
||||
func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil {
|
||||
return ctx.Send(SectorSealFailed{xerrors.Errorf("checkPieces error: %w", err)})
|
||||
switch err.(type) {
|
||||
case ErrApi:
|
||||
log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err)
|
||||
return nil
|
||||
case ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too)
|
||||
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||
case ErrExpiredTicket:
|
||||
return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)})
|
||||
default:
|
||||
return xerrors.Errorf("checkSeal sanity check error: %w", err)
|
||||
}
|
||||
}
|
||||
|
||||
params := &actors.SectorPreCommitInfo{
|
||||
|
55
storage/sealing/states_failed.go
Normal file
55
storage/sealing/states_failed.go
Normal file
@ -0,0 +1,55 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"github.com/filecoin-project/lotus/api"
|
||||
"github.com/filecoin-project/lotus/lib/statemachine"
|
||||
)
|
||||
|
||||
const minRetryTime = 1 * time.Minute
|
||||
|
||||
func failedCooldown(ctx statemachine.Context, sector SectorInfo) error {
|
||||
retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime)
|
||||
if len(sector.Log) > 0 && !time.Now().After(retryStart) {
|
||||
log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], time.Until(retryStart))
|
||||
select {
|
||||
case <-time.After(time.Until(retryStart)):
|
||||
case <-ctx.Context().Done():
|
||||
return ctx.Context().Err()
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
// TODO: <Remove this section after we can re-precommit>
|
||||
|
||||
act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil)
|
||||
if err != nil {
|
||||
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
st, err := m.api.StateReadState(ctx.Context(), act, nil)
|
||||
if err != nil {
|
||||
log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err)
|
||||
return nil
|
||||
}
|
||||
|
||||
_, found := st.State.(map[string]interface{})["PreCommittedSectors"].(map[string]interface{})[fmt.Sprint(sector.SectorID)]
|
||||
if found {
|
||||
// TODO: If not expired yet, we can just try reusing sealticket
|
||||
log.Errorf("sector found in miner preseal array: %+v", sector.SectorID, err)
|
||||
return nil
|
||||
}
|
||||
// </>
|
||||
|
||||
if err := failedCooldown(ctx, sector); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return ctx.Send(SectorRetrySeal{})
|
||||
}
|
Loading…
Reference in New Issue
Block a user