sealing pipeline: Better retry for soft errors
This commit is contained in:
parent
d127208b85
commit
0a83896589
@ -1,14 +1,18 @@
|
|||||||
package sealing
|
package sealing
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
logging "github.com/ipfs/go-log/v2"
|
logging "github.com/ipfs/go-log/v2"
|
||||||
"github.com/stretchr/testify/require"
|
"github.com/stretchr/testify/require"
|
||||||
|
"golang.org/x/xerrors"
|
||||||
|
|
||||||
"github.com/filecoin-project/go-address"
|
"github.com/filecoin-project/go-address"
|
||||||
"github.com/filecoin-project/go-state-types/abi"
|
"github.com/filecoin-project/go-state-types/abi"
|
||||||
"github.com/filecoin-project/go-statemachine"
|
"github.com/filecoin-project/go-statemachine"
|
||||||
|
|
||||||
|
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||||
)
|
)
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
@ -451,3 +455,24 @@ func TestCreationTimeCleared(t *testing.T) {
|
|||||||
|
|
||||||
require.NotEqual(t, int64(0), m.state.CreationTime)
|
require.NotEqual(t, int64(0), m.state.CreationTime)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestRetrySoftErr(t *testing.T) {
|
||||||
|
i := 0
|
||||||
|
|
||||||
|
tf := func() error {
|
||||||
|
i++
|
||||||
|
switch i {
|
||||||
|
case 1:
|
||||||
|
return storiface.Err(storiface.ErrTempAllocateSpace, xerrors.New("foo"))
|
||||||
|
case 2:
|
||||||
|
return nil
|
||||||
|
default:
|
||||||
|
t.Fatalf("what")
|
||||||
|
return xerrors.Errorf("this error didn't ever happen, and will never happen")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
err := retrySoftErr(context.Background(), tf)
|
||||||
|
require.NoError(t, err)
|
||||||
|
require.Equal(t, 2, i)
|
||||||
|
}
|
||||||
|
@ -4,8 +4,10 @@ import (
|
|||||||
"bytes"
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
"io"
|
"io"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"time"
|
||||||
|
|
||||||
"github.com/ipfs/go-cid"
|
"github.com/ipfs/go-cid"
|
||||||
"golang.org/x/xerrors"
|
"golang.org/x/xerrors"
|
||||||
@ -213,6 +215,40 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e
|
|||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var SoftErrRetryWait = 5 * time.Second
|
||||||
|
|
||||||
|
func retrySoftErr(ctx context.Context, cb func() error) error {
|
||||||
|
for {
|
||||||
|
err := cb()
|
||||||
|
if err == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var cerr storiface.WorkError
|
||||||
|
|
||||||
|
if errors.As(err, &cerr) {
|
||||||
|
switch cerr.ErrCode() {
|
||||||
|
case storiface.ErrTempWorkerRestart:
|
||||||
|
fallthrough
|
||||||
|
case storiface.ErrTempAllocateSpace:
|
||||||
|
// retry
|
||||||
|
default:
|
||||||
|
// non-temp error
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// retry
|
||||||
|
if ctx.Err() != nil {
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
|
||||||
|
time.Sleep(SoftErrRetryWait)
|
||||||
|
} else {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
|
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
|
||||||
switch err.(type) {
|
switch err.(type) {
|
||||||
@ -269,7 +305,11 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
var pc1o storiface.PreCommit1Out
|
||||||
|
err = retrySoftErr(ctx.Context(), func() (err error) {
|
||||||
|
pc1o, err = m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
||||||
}
|
}
|
||||||
@ -280,7 +320,12 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
|
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
|
||||||
cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
|
var cids storiface.SectorCids
|
||||||
|
|
||||||
|
err := retrySoftErr(ctx.Context(), func() (err error) {
|
||||||
|
cids, err = m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
|
||||||
|
return err
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
|
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
|
||||||
}
|
}
|
||||||
|
@ -186,12 +186,20 @@ const (
|
|||||||
ErrTempAllocateSpace
|
ErrTempAllocateSpace
|
||||||
)
|
)
|
||||||
|
|
||||||
|
type WorkError interface {
|
||||||
|
ErrCode() ErrorCode
|
||||||
|
}
|
||||||
|
|
||||||
type CallError struct {
|
type CallError struct {
|
||||||
Code ErrorCode
|
Code ErrorCode
|
||||||
Message string
|
Message string
|
||||||
sub error
|
sub error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (c *CallError) ErrCode() ErrorCode {
|
||||||
|
return c.Code
|
||||||
|
}
|
||||||
|
|
||||||
func (c *CallError) Error() string {
|
func (c *CallError) Error() string {
|
||||||
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
|
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
|
||||||
}
|
}
|
||||||
@ -204,6 +212,8 @@ func (c *CallError) Unwrap() error {
|
|||||||
return errors.New(c.Message)
|
return errors.New(c.Message)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
var _ WorkError = &CallError{}
|
||||||
|
|
||||||
func Err(code ErrorCode, sub error) *CallError {
|
func Err(code ErrorCode, sub error) *CallError {
|
||||||
return &CallError{
|
return &CallError{
|
||||||
Code: code,
|
Code: code,
|
||||||
|
Loading…
Reference in New Issue
Block a user