diff --git a/storage/pipeline/fsm_test.go b/storage/pipeline/fsm_test.go index f12b66f93..8a0ce1aed 100644 --- a/storage/pipeline/fsm_test.go +++ b/storage/pipeline/fsm_test.go @@ -1,14 +1,18 @@ package sealing import ( + "context" "testing" logging "github.com/ipfs/go-log/v2" "github.com/stretchr/testify/require" + "golang.org/x/xerrors" "github.com/filecoin-project/go-address" "github.com/filecoin-project/go-state-types/abi" "github.com/filecoin-project/go-statemachine" + + "github.com/filecoin-project/lotus/storage/sealer/storiface" ) func init() { @@ -451,3 +455,24 @@ func TestCreationTimeCleared(t *testing.T) { require.NotEqual(t, int64(0), m.state.CreationTime) } + +func TestRetrySoftErr(t *testing.T) { + i := 0 + + tf := func() error { + i++ + switch i { + case 1: + return storiface.Err(storiface.ErrTempAllocateSpace, xerrors.New("foo")) + case 2: + return nil + default: + t.Fatalf("what") + return xerrors.Errorf("this error didn't ever happen, and will never happen") + } + } + + err := retrySoftErr(context.Background(), tf) + require.NoError(t, err) + require.Equal(t, 2, i) +} diff --git a/storage/pipeline/states_sealing.go b/storage/pipeline/states_sealing.go index 0608ead07..0e0ed565c 100644 --- a/storage/pipeline/states_sealing.go +++ b/storage/pipeline/states_sealing.go @@ -4,8 +4,10 @@ import ( "bytes" "context" "encoding/json" + "errors" "io" "net/http" + "time" "github.com/ipfs/go-cid" "golang.org/x/xerrors" @@ -213,6 +215,40 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e }) } +var SoftErrRetryWait = 5 * time.Second + +func retrySoftErr(ctx context.Context, cb func() error) error { + for { + err := cb() + if err == nil { + return nil + } + + var cerr storiface.WorkError + + if errors.As(err, &cerr) { + switch cerr.ErrCode() { + case storiface.ErrTempWorkerRestart: + fallthrough + case storiface.ErrTempAllocateSpace: + // retry + default: + // non-temp error + return err + } + + // retry + if ctx.Err() != nil { + return ctx.Err() + } + + time.Sleep(SoftErrRetryWait) + } else { + return err + } + } +} + func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error { if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state switch err.(type) { @@ -269,7 +305,11 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } } - pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) + var pc1o storiface.PreCommit1Out + err = retrySoftErr(ctx.Context(), func() (err error) { + pc1o, err = m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos()) + return err + }) if err != nil { return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)}) } @@ -280,7 +320,12 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) } func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error { - cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out) + var cids storiface.SectorCids + + err := retrySoftErr(ctx.Context(), func() (err error) { + cids, err = m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out) + return err + }) if err != nil { return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)}) } diff --git a/storage/sealer/storiface/worker.go b/storage/sealer/storiface/worker.go index 2badad292..e84fd8aa9 100644 --- a/storage/sealer/storiface/worker.go +++ b/storage/sealer/storiface/worker.go @@ -186,12 +186,20 @@ const ( ErrTempAllocateSpace ) +type WorkError interface { + ErrCode() ErrorCode +} + type CallError struct { Code ErrorCode Message string sub error } +func (c *CallError) ErrCode() ErrorCode { + return c.Code +} + func (c *CallError) Error() string { return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message) } @@ -204,6 +212,8 @@ func (c *CallError) Unwrap() error { return errors.New(c.Message) } +var _ WorkError = &CallError{} + func Err(code ErrorCode, sub error) *CallError { return &CallError{ Code: code,