Merge pull request #11087 from filecoin-project/fix/pc1-retr-loop
fix: sealing pipeline: Fix PC1 retry loop
This commit is contained in:
commit
366329b085
@ -31,7 +31,7 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
|
||||
cw := cbg.NewCborWriter(w)
|
||||
|
||||
if _, err := cw.Write([]byte{184, 38}); err != nil {
|
||||
if _, err := cw.Write([]byte{184, 39}); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
@ -565,6 +565,22 @@ func (t *SectorInfo) MarshalCBOR(w io.Writer) error {
|
||||
}
|
||||
}
|
||||
|
||||
// t.PreCommit1Fails (uint64) (uint64)
|
||||
if len("PreCommit1Fails") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"PreCommit1Fails\" was too long")
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("PreCommit1Fails"))); err != nil {
|
||||
return err
|
||||
}
|
||||
if _, err := io.WriteString(w, string("PreCommit1Fails")); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.PreCommit1Fails)); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// t.PreCommit2Fails (uint64) (uint64)
|
||||
if len("PreCommit2Fails") > cbg.MaxLength {
|
||||
return xerrors.Errorf("Value in field \"PreCommit2Fails\" was too long")
|
||||
@ -1402,6 +1418,21 @@ func (t *SectorInfo) UnmarshalCBOR(r io.Reader) (err error) {
|
||||
t.UpdateUnsealed = &c
|
||||
}
|
||||
|
||||
}
|
||||
// t.PreCommit1Fails (uint64) (uint64)
|
||||
case "PreCommit1Fails":
|
||||
|
||||
{
|
||||
|
||||
maj, extra, err = cr.ReadHeader()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if maj != cbg.MajUnsignedInt {
|
||||
return fmt.Errorf("wrong type for uint64 field")
|
||||
}
|
||||
t.PreCommit1Fails = uint64(extra)
|
||||
|
||||
}
|
||||
// t.PreCommit2Fails (uint64) (uint64)
|
||||
case "PreCommit2Fails":
|
||||
|
@ -182,6 +182,8 @@ func (evt SectorSealPreCommit1Failed) FormatError(xerrors.Printer) (next error)
|
||||
func (evt SectorSealPreCommit1Failed) apply(si *SectorInfo) {
|
||||
si.InvalidProofs = 0 // reset counter
|
||||
si.PreCommit2Fails = 0
|
||||
|
||||
si.PreCommit1Fails++
|
||||
}
|
||||
|
||||
type SectorSealPreCommit2Failed struct{ error }
|
||||
|
@ -1,14 +1,18 @@
|
||||
package sealing
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
|
||||
logging "github.com/ipfs/go-log/v2"
|
||||
"github.com/stretchr/testify/require"
|
||||
"golang.org/x/xerrors"
|
||||
|
||||
"github.com/filecoin-project/go-address"
|
||||
"github.com/filecoin-project/go-state-types/abi"
|
||||
"github.com/filecoin-project/go-statemachine"
|
||||
|
||||
"github.com/filecoin-project/lotus/storage/sealer/storiface"
|
||||
)
|
||||
|
||||
func init() {
|
||||
@ -451,3 +455,24 @@ func TestCreationTimeCleared(t *testing.T) {
|
||||
|
||||
require.NotEqual(t, int64(0), m.state.CreationTime)
|
||||
}
|
||||
|
||||
func TestRetrySoftErr(t *testing.T) {
|
||||
i := 0
|
||||
|
||||
tf := func() error {
|
||||
i++
|
||||
switch i {
|
||||
case 1:
|
||||
return storiface.Err(storiface.ErrTempAllocateSpace, xerrors.New("foo"))
|
||||
case 2:
|
||||
return nil
|
||||
default:
|
||||
t.Fatalf("what")
|
||||
return xerrors.Errorf("this error didn't ever happen, and will never happen")
|
||||
}
|
||||
}
|
||||
|
||||
err := retrySoftErr(context.Background(), tf)
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 2, i)
|
||||
}
|
||||
|
@ -54,7 +54,13 @@ func (m *Sealing) checkPreCommitted(ctx statemachine.Context, sector SectorInfo)
|
||||
return info, true
|
||||
}
|
||||
|
||||
var MaxPreCommit1Retries = uint64(3)
|
||||
|
||||
func (m *Sealing) handleSealPrecommit1Failed(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if sector.PreCommit1Fails > MaxPreCommit1Retries {
|
||||
return ctx.Send(SectorRemove{})
|
||||
}
|
||||
|
||||
if err := failedCooldown(ctx, sector); err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -4,8 +4,10 @@ import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"github.com/ipfs/go-cid"
|
||||
"golang.org/x/xerrors"
|
||||
@ -213,6 +215,41 @@ func (m *Sealing) handleGetTicket(ctx statemachine.Context, sector SectorInfo) e
|
||||
})
|
||||
}
|
||||
|
||||
var SoftErrRetryWait = 5 * time.Second
|
||||
|
||||
func retrySoftErr(ctx context.Context, cb func() error) error {
|
||||
for {
|
||||
err := cb()
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
|
||||
var cerr storiface.WorkError
|
||||
|
||||
if errors.As(err, &cerr) {
|
||||
switch cerr.ErrCode() {
|
||||
case storiface.ErrTempWorkerRestart:
|
||||
fallthrough
|
||||
case storiface.ErrTempAllocateSpace:
|
||||
// retry
|
||||
default:
|
||||
// non-temp error
|
||||
return err
|
||||
}
|
||||
|
||||
// check if the context got cancelled early
|
||||
if ctx.Err() != nil {
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// retry
|
||||
time.Sleep(SoftErrRetryWait)
|
||||
} else {
|
||||
return err
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo) error {
|
||||
if err := checkPieces(ctx.Context(), m.maddr, sector.SectorNumber, sector.Pieces, m.Api, false); err != nil { // Sanity check state
|
||||
switch err.(type) {
|
||||
@ -269,7 +306,11 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
||||
}
|
||||
}
|
||||
|
||||
pc1o, err := m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
||||
var pc1o storiface.PreCommit1Out
|
||||
err = retrySoftErr(ctx.Context(), func() (err error) {
|
||||
pc1o, err = m.sealer.SealPreCommit1(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.TicketValue, sector.pieceInfos())
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ctx.Send(SectorSealPreCommit1Failed{xerrors.Errorf("seal pre commit(1) failed: %w", err)})
|
||||
}
|
||||
@ -280,7 +321,12 @@ func (m *Sealing) handlePreCommit1(ctx statemachine.Context, sector SectorInfo)
|
||||
}
|
||||
|
||||
func (m *Sealing) handlePreCommit2(ctx statemachine.Context, sector SectorInfo) error {
|
||||
cids, err := m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
|
||||
var cids storiface.SectorCids
|
||||
|
||||
err := retrySoftErr(ctx.Context(), func() (err error) {
|
||||
cids, err = m.sealer.SealPreCommit2(sector.sealingCtx(ctx.Context()), m.minerSector(sector.SectorType, sector.SectorNumber), sector.PreCommit1Out)
|
||||
return err
|
||||
})
|
||||
if err != nil {
|
||||
return ctx.Send(SectorSealPreCommit2Failed{xerrors.Errorf("seal pre commit(2) failed: %w", err)})
|
||||
}
|
||||
|
@ -56,6 +56,8 @@ type SectorInfo struct {
|
||||
TicketEpoch abi.ChainEpoch
|
||||
PreCommit1Out storiface.PreCommit1Out
|
||||
|
||||
PreCommit1Fails uint64
|
||||
|
||||
// PreCommit2
|
||||
CommD *cid.Cid
|
||||
CommR *cid.Cid // SectorKey
|
||||
|
@ -186,12 +186,20 @@ const (
|
||||
ErrTempAllocateSpace
|
||||
)
|
||||
|
||||
type WorkError interface {
|
||||
ErrCode() ErrorCode
|
||||
}
|
||||
|
||||
type CallError struct {
|
||||
Code ErrorCode
|
||||
Message string
|
||||
sub error
|
||||
}
|
||||
|
||||
func (c *CallError) ErrCode() ErrorCode {
|
||||
return c.Code
|
||||
}
|
||||
|
||||
func (c *CallError) Error() string {
|
||||
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
|
||||
}
|
||||
@ -204,6 +212,8 @@ func (c *CallError) Unwrap() error {
|
||||
return errors.New(c.Message)
|
||||
}
|
||||
|
||||
var _ WorkError = &CallError{}
|
||||
|
||||
func Err(code ErrorCode, sub error) *CallError {
|
||||
return &CallError{
|
||||
Code: code,
|
||||
|
Loading…
Reference in New Issue
Block a user