diff --git a/storage/pipeline/cbor_gen.go b/storage/pipeline/cbor_gen.go index d14611c6a..57a668ae6 100644 --- a/storage/pipeline/cbor_gen.go +++ b/storage/pipeline/cbor_gen.go @@ -31,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { cw := cbg.NewCborWriter(w) - if _, err := cw.Write([]byte{184, 38}); err != nil { + if _, err := cw.Write([]byte{184, 39}); err != nil { return err } @@ -565,6 +565,22 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error { } } + // t.PreCommit1Fails (uint64) (uint64) + if len("PreCommit1Fails") > cbg.MaxLength { + return xerrors.Errorf("Value in field \"PreCommit1Fails\" was too long") + } + + if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("PreCommit1Fails"))); err != nil { + return err + } + if _, err := io.WriteString(w, string("PreCommit1Fails")); err != nil { + return err + } + + if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PreCommit1Fails)); err != nil { + return err + } + // t.PreCommit2Fails (uint64) (uint64) if len("PreCommit2Fails") > cbg.MaxLength { return xerrors.Errorf("Value in field \"PreCommit2Fails\" was too long") @@ -1402,6 +1418,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) { t.UpdateUnsealed = &c } + } + // t.PreCommit1Fails (uint64) (uint64) + case "PreCommit1Fails": + + { + + maj, extra, err = cr.ReadHeader() + if err != nil { + return err + } + if maj != cbg.MajUnsignedInt { + return fmt.Errorf("wrong type for uint64 field") + } + t.PreCommit1Fails = uint64(extra) + } // t.PreCommit2Fails (uint64) (uint64) case "PreCommit2Fails": diff --git a/storage/pipeline/fsm_events.go b/storage/pipeline/fsm_events.go index 122691ca3..a798a884b 100644 --- a/storage/pipeline/fsm_events.go +++ b/storage/pipeline/fsm_events.go @@ -182,6 +182,8 @@ func (evt SectorSealPreCommit1Failed) FormatError(xerrors.Printer) (next error) func (evt SectorSealPreCommit1Failed) apply(si *SectorInfo) { si.InvalidProofs = 0 // reset counter si.PreCommit2Fails = 0 + + si.PreCommit1Fails++ } type SectorSealPreCommit2Failed struct{ error } diff --git a/storage/pipeline/fsm_test.go b/storage/pipeline/fsm_test.go index 4dfc8548d..7d7201953 100644 --- a/storage/pipeline/fsm_test.go +++ b/storage/pipeline/fsm_test.go @@ -1,14 +1,18 @@ package sealing import ( + "context" "testing" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statemachine" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) func init() { @@ -451,3 +455,24 @@ func TestCreationTimeCleared(t *testing.T) { require.NotEqual(t, int64(0), m.state.CreationTime) } + +func TestRetrySoftErr(t *testing.T) { + i := 0 + + tf := func() error { + i++ + switch i { + case 1: + return storiface.Err(storiface.ErrTempAllocateSpace, xerrors.New("foo")) + case 2: + return nil + default: + t.Fatalf("what") + return xerrors.Errorf("this error didn't ever happen, and will never happen") + } + } + + err := retrySoftErr(context.Background(), tf) + require.NoError(t, err) + require.Equal(t, 2, i) +} diff --git a/storage/pipeline/states_failed.go b/storage/pipeline/states_failed.go index 8bad4cee3..203f14910 100644 --- a/storage/pipeline/states_failed.go +++ b/storage/pipeline/states_failed.go @@ -54,7 +54,13 @@ func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo) return info, true } +var MaxPreCommit1Retries = uint64(3) + func (m *Sealing) handleSealPrecommit1Failed(ctx statemachine.Context, sector SectorInfo) error { + if sector.PreCommit1Fails > MaxPreCommit1Retries { + return ctx.Send(SectorRemove{}) + } + if err := failedCooldown(ctx, sector); err != nil { return err } diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 1d7b36fe8..48d024f85 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "encoding/json" + "errors" "io" "net/http" + "time" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -213,6 +215,41 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e }) } +var SoftErrRetryWait = 5 * time.Second + +func retrySoftErr(ctx context.Context, cb func() error) error { + for { + err := cb() + if err == nil { + return nil + } + + var cerr storiface.WorkError + + if errors.As(err, &cerr) { + switch cerr.ErrCode() { + case storiface.ErrTempWorkerRestart: + fallthrough + case storiface.ErrTempAllocateSpace: + // retry + default: + // non-temp error + return err + } + + // check if the context got cancelled early + if ctx.Err() != nil { + return ctx.Err() + } + + // retry + time.Sleep(SoftErrRetryWait) + } else { + return err + } + } +} + func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state switch err.(type) { @@ -269,7 +306,11 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) + var pc1o storiface.PreCommit1Out + err = retrySoftErr(ctx.Context(), func() (err error) { + pc1o, err = m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) + return err + }) if err != nil { return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } @@ -280,7 +321,12 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { - cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out) + var cids storiface.SectorCids + + err := retrySoftErr(ctx.Context(), func() (err error) { + cids, err = m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out) + return err + }) if err != nil { return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } diff --git a/storage/pipeline/types.go b/storage/pipeline/types.go index 6329b5666..e752eb2b9 100644 --- a/storage/pipeline/types.go +++ b/storage/pipeline/types.go @@ -56,6 +56,8 @@ type SectorInfo struct { TicketEpoch abi.ChainEpoch PreCommit1Out storiface.PreCommit1Out + PreCommit1Fails uint64 + // PreCommit2 CommD *cid.Cid CommR *cid.Cid // SectorKey diff --git a/storage/sealer/storiface/worker.go b/storage/sealer/storiface/worker.go index 2badad292..e84fd8aa9 100644 --- a/storage/sealer/storiface/worker.go +++ b/storage/sealer/storiface/worker.go @@ -186,12 +186,20 @@ const ( ErrTempAllocateSpace ) +type WorkError interface { + ErrCode() ErrorCode +} + type CallError struct { Code ErrorCode Message string sub error } +func (c *CallError) ErrCode() ErrorCode { + return c.Code +} + func (c *CallError) Error() string { return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message) } @@ -204,6 +212,8 @@ func (c *CallError) Unwrap() error { return errors.New(c.Message) } +var _ WorkError = &CallError{} + func Err(code ErrorCode, sub error) *CallError { return &CallError{ Code: code,