Merge pull request #4890 from filecoin-project/feat/worker-error-codes

Add error codes to worker return
This commit is contained in:
Łukasz Magiera 2020-11-17 21:29:42 +01:00 committed by GitHub
commit 65202668af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
10 changed files with 170 additions and 97 deletions

View File

@ -311,17 +311,17 @@ type StorageMinerStruct struct {
WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"`
ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin" retry:"true"`
ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin" retry:"true"`
ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error `perm:"admin" retry:"true"`
ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error `perm:"admin" retry:"true"`
ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error `perm:"admin" retry:"true"`
ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err string) error `perm:"admin" retry:"true"`
ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"`
ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnFetch func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"`
SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"`
@ -1271,47 +1271,47 @@ func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uuid.UUID][]st
return c.Internal.WorkerJobs(ctx)
}
func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error {
func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return c.Internal.ReturnAddPiece(ctx, callID, pi, err)
}
func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error {
func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return c.Internal.ReturnSealPreCommit1(ctx, callID, p1o, err)
}
func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error {
func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return c.Internal.ReturnSealPreCommit2(ctx, callID, sealed, err)
}
func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error {
func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return c.Internal.ReturnSealCommit1(ctx, callID, out, err)
}
func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error {
func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return c.Internal.ReturnSealCommit2(ctx, callID, proof, err)
}
func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error {
func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnFinalizeSector(ctx, callID, err)
}
func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error {
func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnReleaseUnsealed(ctx, callID, err)
}
func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error {
func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnMoveStorage(ctx, callID, err)
}
func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error {
func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnUnsealPiece(ctx, callID, err)
}
func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error {
func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return c.Internal.ReturnReadPiece(ctx, callID, ok, err)
}
func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error {
func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnFetch(ctx, callID, err)
}

View File

@ -233,6 +233,7 @@ func init() {
CpuUse: 0,
},
})
addExample(storiface.ErrorCode(0))
// worker specific
addExample(storiface.AcquireMove)

View File

@ -987,7 +987,10 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1008,7 +1011,10 @@ Inputs:
},
"ID": "07070707-0707-0707-0707-070707070707"
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1029,7 +1035,10 @@ Inputs:
},
"ID": "07070707-0707-0707-0707-070707070707"
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1050,7 +1059,10 @@ Inputs:
},
"ID": "07070707-0707-0707-0707-070707070707"
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1072,7 +1084,10 @@ Inputs:
"ID": "07070707-0707-0707-0707-070707070707"
},
true,
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1093,7 +1108,10 @@ Inputs:
},
"ID": "07070707-0707-0707-0707-070707070707"
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1115,7 +1133,10 @@ Inputs:
"ID": "07070707-0707-0707-0707-070707070707"
},
null,
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1137,7 +1158,10 @@ Inputs:
"ID": "07070707-0707-0707-0707-070707070707"
},
null,
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1159,7 +1183,10 @@ Inputs:
"ID": "07070707-0707-0707-0707-070707070707"
},
null,
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1188,7 +1215,10 @@ Inputs:
"/": "bafy2bzacea3wsdh6y3a36tb3skempjoxqpuyompjbmfeyf34fi3uy6uue42v4"
}
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```
@ -1209,7 +1239,10 @@ Inputs:
},
"ID": "07070707-0707-0707-0707-070707070707"
},
"string value"
{
"Code": 0,
"Message": "string value"
}
]
```

View File

@ -617,47 +617,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
return err
}
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error {
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err)
}
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error {
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err)
}
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error {
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err)
}
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error {
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err)
}
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error {
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err)
}
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error {
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
}
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error {
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
}
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error {
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
}
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error {
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
}
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error {
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err)
}
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error {
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err)
}

View File

@ -5,7 +5,6 @@ import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"os"
"time"
@ -350,15 +349,12 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
}
}
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error {
var err error
if serr != "" {
err = errors.New(serr)
}
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
res := result{
r: r,
err: err,
}
if cerr != nil {
res.err = cerr
}
m.sched.workTracker.onDone(callID)
@ -392,7 +388,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
m.results[wid] = res
err = m.work.Get(wid).Mutate(func(ws *WorkState) error {
err := m.work.Get(wid).Mutate(func(ws *WorkState) error {
ws.Status = wsDone
return nil
})
@ -416,5 +412,6 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
}
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
return m.returnResult(call, nil, "task aborted")
// TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
}

View File

@ -418,47 +418,47 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr
return bad, nil
}
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error {
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error {
func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error {
func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error {
func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error {
func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error {
func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error {
func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error {
func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error {
func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error {
func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error {
func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported")
}

View File

@ -361,7 +361,7 @@ func (st *Local) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen
if stat.Available < overhead {
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)
return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available))
}
p.reserved += overhead

View File

@ -2,6 +2,7 @@ package storiface
import (
"context"
"errors"
"fmt"
"io"
"time"
@ -90,16 +91,56 @@ type WorkerCalls interface {
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
}
type WorkerReturn interface {
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err string) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err string) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err string) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err string) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err string) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err string) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err string) error
ReturnMoveStorage(ctx context.Context, callID CallID, err string) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err string) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err string) error
ReturnFetch(ctx context.Context, callID CallID, err string) error
type ErrorCode int
const (
ErrUnknown ErrorCode = iota
)
const (
// Temp Errors
ErrTempUnknown ErrorCode = iota + 100
ErrTempWorkerRestart
ErrTempAllocateSpace
)
type CallError struct {
Code ErrorCode
Message string
sub error
}
func (c *CallError) Error() string {
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Message)
}
func (c *CallError) Unwrap() error {
if c.sub != nil {
return c.sub
}
return errors.New(c.Message)
}
func Err(code ErrorCode, sub error) *CallError {
return &CallError{
Code: code,
Message: sub.Error(),
sub: sub,
}
}
type WorkerReturn interface {
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
}

View File

@ -61,7 +61,7 @@ func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface.
func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
if err := t.ret.ReturnAddPiece(ctx, ci, p, errstr(err)); err != nil {
if err := t.ret.ReturnAddPiece(ctx, ci, p, toCallError(err)); err != nil {
log.Error(err)
}
})
@ -79,7 +79,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe
defer t.pc1lk.Unlock()
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil {
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, toCallError(err)); err != nil {
log.Error(err)
}
})
@ -87,7 +87,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe
func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil {
if err := t.ret.ReturnFetch(ctx, ci, nil); err != nil {
log.Error(err)
}
})

View File

@ -90,7 +90,7 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
go func() {
for _, call := range unfinished {
err := xerrors.Errorf("worker restarted")
err := storiface.Err(storiface.ErrTempWorkerRestart, xerrors.New("worker restarted"))
// TODO: Handle restarting PC1 once support is merged
@ -166,15 +166,15 @@ const (
// in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string)
func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error {
func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error {
rf := reflect.ValueOf(in)
ft := rf.Type()
withRet := ft.NumIn() == 5
return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err error) error {
return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err *storiface.CallError) error {
rctx := reflect.ValueOf(ctx)
rwr := reflect.ValueOf(wr)
rerr := reflect.ValueOf(errstr(err))
rerr := reflect.ValueOf(err)
rci := reflect.ValueOf(ci)
var ro []reflect.Value
@ -198,7 +198,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
}
}
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
@ -245,7 +245,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
}
}
if doReturn(ctx, rt, ci, l.ret, res, err) {
if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {
if err := l.ct.onReturned(ci); err != nil {
log.Errorf("tracking call (done): %+v", err)
}
@ -255,8 +255,17 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
return ci, nil
}
func toCallError(err error) *storiface.CallError {
var serr *storiface.CallError
if err != nil && !xerrors.As(err, &serr) {
serr = storiface.Err(storiface.ErrUnknown, err)
}
return serr
}
// doReturn tries to send the result to manager, returns true if successful
func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr error) bool {
func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr *storiface.CallError) bool {
for {
err := returnFunc[rt](ctx, ci, ret, res, rerr)
if err == nil {
@ -279,14 +288,6 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori
return true
}
func errstr(err error) string {
if err != nil {
return err.Error()
}
return ""
}
func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error {
sb, err := l.executor()
if err != nil {