From 94f2948020372e4004c1c48c8c5837f2b3900030 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Thu, 23 Jan 2020 16:38:01 +0100 Subject: [PATCH] sealing: implement handler for sealFailed --- api/api_storage.go | 1 + cmd/lotus-seal-worker/transfer.go | 1 + storage/miner.go | 1 + storage/sealing/checks.go | 4 ++- storage/sealing/fsm.go | 6 +++- storage/sealing/fsm_events.go | 8 +++++ storage/sealing/sealing.go | 1 + storage/sealing/states.go | 24 ++++++++++++-- storage/sealing/states_failed.go | 55 +++++++++++++++++++++++++++++++ 9 files changed, 97 insertions(+), 4 deletions(-) create mode 100644 storage/sealing/states_failed.go diff --git a/api/api_storage.go b/api/api_storage.go index f52f9f3a4..77cf32a84 100644 --- a/api/api_storage.go +++ b/api/api_storage.go @@ -70,6 +70,7 @@ var SectorStates = []string{ PreCommitFailed: "PreCommitFailed", SealCommitFailed: "SealCommitFailed", CommitFailed: "CommitFailed", + PackingFailed: "PackingFailed", FailedUnrecoverable: "FailedUnrecoverable", diff --git a/cmd/lotus-seal-worker/transfer.go b/cmd/lotus-seal-worker/transfer.go index 54ce552fe..fcd473392 100644 --- a/cmd/lotus-seal-worker/transfer.go +++ b/cmd/lotus-seal-worker/transfer.go @@ -14,6 +14,7 @@ import ( "github.com/filecoin-project/lotus/lib/tarutil" ) + func (w *worker) sizeForType(typ string) int64 { size := int64(w.sb.SectorSize()) if typ == "cache" { diff --git a/storage/miner.go b/storage/miner.go index ce4f6a9e4..3355107da 100644 --- a/storage/miner.go +++ b/storage/miner.go @@ -50,6 +50,7 @@ type storageMinerApi interface { StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sealing/checks.go b/storage/sealing/checks.go index 3ec86b9f3..491e330b0 100644 --- a/storage/sealing/checks.go +++ b/storage/sealing/checks.go @@ -12,6 +12,8 @@ import ( "github.com/filecoin-project/lotus/chain/types" ) +// TODO: For now we handle this by halting state execution, when we get jsonrpc reconnecting +// We should implement some wait-for-api logic type ErrApi error type ErrInvalidDeals error @@ -78,7 +80,7 @@ func checkSeal(ctx context.Context, maddr address.Address, si SectorInfo, api se } r, err := api.StateCall(ctx, ccmt, nil) if err != nil { - return xerrors.Errorf("calling ComputeDataCommitment: %w", err) + return ErrApi(xerrors.Errorf("calling ComputeDataCommitment: %w", err)) } if r.ExitCode != 0 { return ErrBadCommD(xerrors.Errorf("receipt for ComputeDataCommitment had exit code %d", r.ExitCode)) diff --git a/storage/sealing/fsm.go b/storage/sealing/fsm.go index fced052aa..d6bd4160e 100644 --- a/storage/sealing/fsm.go +++ b/storage/sealing/fsm.go @@ -58,6 +58,10 @@ var fsmPlanners = []func(events []statemachine.Event, state *SectorInfo) error{ on(SectorFaulty{}, api.Faulty), ), + api.SealFailed: planOne( + on(SectorRetrySeal{}, api.Unsealed), + ), + api.Faulty: planOne( on(SectorFaultReported{}, api.FaultReported), ), @@ -84,7 +88,7 @@ func (m *Sealing) plan(events []statemachine.Event, state *SectorInfo) (func(sta p := fsmPlanners[state.State] if p == nil { - return nil, xerrors.Errorf("planner for state %d not found", state.State) + return nil, xerrors.Errorf("planner for state %s not found", state.State, api.SectorStates[state.State]) } if err := p(events, state); err != nil { diff --git a/storage/sealing/fsm_events.go b/storage/sealing/fsm_events.go index d25668da2..948a1653b 100644 --- a/storage/sealing/fsm_events.go +++ b/storage/sealing/fsm_events.go @@ -120,6 +120,14 @@ type SectorProving struct{} func (evt SectorProving) apply(*SectorInfo) {} +// Failed state recovery + +type SectorRetrySeal struct{} + +func (evt SectorRetrySeal) apply(state *SectorInfo) {} + +// Faults + type SectorFaulty struct{} func (evt SectorFaulty) apply(state *SectorInfo) {} diff --git a/storage/sealing/sealing.go b/storage/sealing/sealing.go index 6d235488e..45034047e 100644 --- a/storage/sealing/sealing.go +++ b/storage/sealing/sealing.go @@ -39,6 +39,7 @@ type sealingApi interface { // TODO: trim down StateGetActor(ctx context.Context, actor address.Address, ts *types.TipSet) (*types.Actor, error) StateGetReceipt(context.Context, cid.Cid, *types.TipSet) (*types.MessageReceipt, error) StateMarketStorageDeal(context.Context, uint64, *types.TipSet) (*actors.OnChainDeal, error) + StateReadState(ctx context.Context, act *types.Actor, ts *types.TipSet) (*api.ActorState, error) MpoolPushMessage(context.Context, *types.Message) (*types.SignedMessage, error) diff --git a/storage/sealing/states.go b/storage/sealing/states.go index 754e88d04..81c3549df 100644 --- a/storage/sealing/states.go +++ b/storage/sealing/states.go @@ -45,7 +45,17 @@ func (m *Sealing) handlePacking(ctx statemachine.Context, sector SectorInfo) err func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), sector, m.api); err != nil { // Sanity check state - return ctx.Send(SectorPackingFailed{xerrors.Errorf("checkPieces error: %w", err)}) + switch err.(type) { + case ErrApi: + log.Errorf("handleUnsealed: api error, not proceeding: %+v", err) + return nil + case ErrInvalidDeals: + return ctx.Send(SectorPackingFailed{xerrors.Errorf("invalid deals in sector: %w", err)}) + case ErrExpiredDeals: // Probably not much we can do here, maybe re-pack the sector? + return ctx.Send(SectorPackingFailed{xerrors.Errorf("expired deals in sector: %w", err)}) + default: + return xerrors.Errorf("checkPieces sanity check error: %w", err) + } } log.Infow("performing sector replication...", "sector", sector.SectorID) @@ -71,7 +81,17 @@ func (m *Sealing) handleUnsealed(ctx statemachine.Context, sector SectorInfo) er func (m *Sealing) handlePreCommitting(ctx statemachine.Context, sector SectorInfo) error { if err := checkSeal(ctx.Context(), m.maddr, sector, m.api); err != nil { - return ctx.Send(SectorSealFailed{xerrors.Errorf("checkPieces error: %w", err)}) + switch err.(type) { + case ErrApi: + log.Errorf("handlePreCommitting: api error, not proceeding: %+v", err) + return nil + case ErrBadCommD: // TODO: Should this just back to packing? (not really needed since handleUnsealed will do that too) + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + case ErrExpiredTicket: + return ctx.Send(SectorSealFailed{xerrors.Errorf("bad CommD error: %w", err)}) + default: + return xerrors.Errorf("checkSeal sanity check error: %w", err) + } } params := &actors.SectorPreCommitInfo{ diff --git a/storage/sealing/states_failed.go b/storage/sealing/states_failed.go new file mode 100644 index 000000000..5d125c2be --- /dev/null +++ b/storage/sealing/states_failed.go @@ -0,0 +1,55 @@ +package sealing + +import ( + "fmt" + "time" + + "github.com/filecoin-project/lotus/api" + "github.com/filecoin-project/lotus/lib/statemachine" +) + +const minRetryTime = 1 * time.Minute + +func failedCooldown(ctx statemachine.Context, sector SectorInfo) error { + retryStart := time.Unix(int64(sector.Log[len(sector.Log)-1].Timestamp), 0).Add(minRetryTime) + if len(sector.Log) > 0 && !time.Now().After(retryStart) { + log.Infof("%s(%d), waiting %s before retrying", api.SectorStates[sector.State], time.Until(retryStart)) + select { + case <-time.After(time.Until(retryStart)): + case <-ctx.Context().Done(): + return ctx.Context().Err() + } + } + + return nil +} + +func (m *Sealing) handleSealFailed(ctx statemachine.Context, sector SectorInfo) error { + // TODO: + + act, err := m.api.StateGetActor(ctx.Context(), m.maddr, nil) + if err != nil { + log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) + return nil + } + + st, err := m.api.StateReadState(ctx.Context(), act, nil) + if err != nil { + log.Errorf("handleSealFailed(%d): temp error: %+v", sector.SectorID, err) + return nil + } + + _, found := st.State.(map[string]interface{})["PreCommittedSectors"].(map[string]interface{})[fmt.Sprint(sector.SectorID)] + if found { + // TODO: If not expired yet, we can just try reusing sealticket + log.Errorf("sector found in miner preseal array: %+v", sector.SectorID, err) + return nil + } + // + + if err := failedCooldown(ctx, sector); err != nil { + return err + } + + return ctx.Send(SectorRetrySeal{}) +}