From b8853aa4d5fde74785288c8515d0b08761757634 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 17 Nov 2020 16:17:45 +0100 Subject: [PATCH] Add error codes to worker return --- api/apistruct/struct.go | 44 +++++++-------- extern/sector-storage/manager.go | 22 ++++---- extern/sector-storage/manager_calltracker.go | 19 +++---- extern/sector-storage/mock/mock.go | 22 ++++---- extern/sector-storage/stores/local.go | 2 +- extern/sector-storage/storiface/worker.go | 57 +++++++++++++++----- extern/sector-storage/testworker_test.go | 6 +-- extern/sector-storage/worker_local.go | 37 +++++++------ 8 files changed, 123 insertions(+), 86 deletions(-) diff --git a/api/apistruct/struct.go b/api/apistruct/struct.go index 5b0ffcd6e..214f56422 100644 --- a/api/apistruct/struct.go +++ b/api/apistruct/struct.go @@ -311,17 +311,17 @@ type StorageMinerStruct struct { WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"` WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"` - ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin" retry:"true"` - ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin" retry:"true"` - ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error `perm:"admin" retry:"true"` - ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error `perm:"admin" retry:"true"` - ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error `perm:"admin" retry:"true"` - ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` - ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` - ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` - ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` - ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err string) error `perm:"admin" retry:"true"` - ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` + ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error `perm:"admin" retry:"true"` + ReturnFetch func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"` SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"` @@ -1271,47 +1271,47 @@ func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uuid.UUID][]st return c.Internal.WorkerJobs(ctx) } -func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { +func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { return c.Internal.ReturnAddPiece(ctx, callID, pi, err) } -func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { +func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error { return c.Internal.ReturnSealPreCommit1(ctx, callID, p1o, err) } -func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { +func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error { return c.Internal.ReturnSealPreCommit2(ctx, callID, sealed, err) } -func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { +func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error { return c.Internal.ReturnSealCommit1(ctx, callID, out, err) } -func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { +func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error { return c.Internal.ReturnSealCommit2(ctx, callID, proof, err) } -func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { +func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return c.Internal.ReturnFinalizeSector(ctx, callID, err) } -func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { +func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return c.Internal.ReturnReleaseUnsealed(ctx, callID, err) } -func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { +func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return c.Internal.ReturnMoveStorage(ctx, callID, err) } -func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { +func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return c.Internal.ReturnUnsealPiece(ctx, callID, err) } -func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { +func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error { return c.Internal.ReturnReadPiece(ctx, callID, ok, err) } -func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { +func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return c.Internal.ReturnFetch(ctx, callID, err) } diff --git a/extern/sector-storage/manager.go b/extern/sector-storage/manager.go index eec19c9f2..52e079d75 100644 --- a/extern/sector-storage/manager.go +++ b/extern/sector-storage/manager.go @@ -617,47 +617,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error { return err } -func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { +func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { return m.returnResult(callID, pi, err) } -func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { +func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error { return m.returnResult(callID, p1o, err) } -func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { +func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error { return m.returnResult(callID, sealed, err) } -func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { +func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error { return m.returnResult(callID, out, err) } -func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { +func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error { return m.returnResult(callID, proof, err) } -func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { +func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(callID, nil, err) } -func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { +func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(callID, nil, err) } -func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { +func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(callID, nil, err) } -func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { +func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(callID, nil, err) } -func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { +func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error { return m.returnResult(callID, ok, err) } -func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { +func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { return m.returnResult(callID, nil, err) } diff --git a/extern/sector-storage/manager_calltracker.go b/extern/sector-storage/manager_calltracker.go index 9b39a5a70..e2f801303 100644 --- a/extern/sector-storage/manager_calltracker.go +++ b/extern/sector-storage/manager_calltracker.go @@ -5,7 +5,6 @@ import ( "crypto/sha256" "encoding/hex" "encoding/json" - "errors" "fmt" "os" "time" @@ -350,15 +349,12 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf } } -func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error { - var err error - if serr != "" { - err = errors.New(serr) - } - +func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error { res := result{ - r: r, - err: err, + r: r, + } + if cerr != nil { + res.err = cerr } m.sched.workTracker.onDone(callID) @@ -392,7 +388,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri m.results[wid] = res - err = m.work.Get(wid).Mutate(func(ws *WorkState) error { + err := m.work.Get(wid).Mutate(func(ws *WorkState) error { ws.Status = wsDone return nil }) @@ -416,5 +412,6 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri } func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error { - return m.returnResult(call, nil, "task aborted") + // TODO: Allow temp error + return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted"))) } diff --git a/extern/sector-storage/mock/mock.go b/extern/sector-storage/mock/mock.go index 5e85d6ef7..747fcdf8b 100644 --- a/extern/sector-storage/mock/mock.go +++ b/extern/sector-storage/mock/mock.go @@ -418,47 +418,47 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr return bad, nil } -func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { +func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { +func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { +func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { +func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { +func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { +func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { +func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { +func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { +func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { +func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error { panic("not supported") } -func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { +func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error { panic("not supported") } diff --git a/extern/sector-storage/stores/local.go b/extern/sector-storage/stores/local.go index 026f096eb..c39e76f18 100644 --- a/extern/sector-storage/stores/local.go +++ b/extern/sector-storage/stores/local.go @@ -361,7 +361,7 @@ func (st *Local) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen if stat.Available < overhead { - return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available) + return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available)) } p.reserved += overhead diff --git a/extern/sector-storage/storiface/worker.go b/extern/sector-storage/storiface/worker.go index 4e52fa04c..d329521e7 100644 --- a/extern/sector-storage/storiface/worker.go +++ b/extern/sector-storage/storiface/worker.go @@ -90,16 +90,49 @@ type WorkerCalls interface { Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error) } -type WorkerReturn interface { - ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err string) error - ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err string) error - ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err string) error - ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err string) error - ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err string) error - ReturnFinalizeSector(ctx context.Context, callID CallID, err string) error - ReturnReleaseUnsealed(ctx context.Context, callID CallID, err string) error - ReturnMoveStorage(ctx context.Context, callID CallID, err string) error - ReturnUnsealPiece(ctx context.Context, callID CallID, err string) error - ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err string) error - ReturnFetch(ctx context.Context, callID CallID, err string) error +type ErrorCode int + +const ( + ErrUnknown ErrorCode = iota +) + +const ( + // Temp Errors + ErrTempUnknown ErrorCode = iota + 100 + ErrTempWorkerRestart + ErrTempAllocateSpace +) + +type CallError struct { + Code ErrorCode + Sub error +} + +func (c *CallError) Error() string { + return fmt.Sprintf("storage call error %d: %s", c.Code, c.Sub.Error()) +} + +func (c *CallError) Unwrap() error { + return c.Sub +} + +func Err(code ErrorCode, sub error) *CallError { + return &CallError{ + Code: code, + Sub: sub, + } +} + +type WorkerReturn interface { + ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error + ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error + ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error + ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error + ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error + ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error + ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error + ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error + ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error + ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error + ReturnFetch(ctx context.Context, callID CallID, err *CallError) error } diff --git a/extern/sector-storage/testworker_test.go b/extern/sector-storage/testworker_test.go index 94dc815f0..2fe99f3d4 100644 --- a/extern/sector-storage/testworker_test.go +++ b/extern/sector-storage/testworker_test.go @@ -61,7 +61,7 @@ func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface. func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { return t.asyncCall(sector, func(ci storiface.CallID) { p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) - if err := t.ret.ReturnAddPiece(ctx, ci, p, errstr(err)); err != nil { + if err := t.ret.ReturnAddPiece(ctx, ci, p, toCallError(err)); err != nil { log.Error(err) } }) @@ -79,7 +79,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe defer t.pc1lk.Unlock() p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces) - if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil { + if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, toCallError(err)); err != nil { log.Error(err) } }) @@ -87,7 +87,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { return t.asyncCall(sector, func(ci storiface.CallID) { - if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil { + if err := t.ret.ReturnFetch(ctx, ci, nil); err != nil { log.Error(err) } }) diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 7ab23e335..c1e8e6e81 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -90,7 +90,10 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store go func() { for _, call := range unfinished { - err := xerrors.Errorf("worker restarted") + err := &storiface.CallError{ + Sub: xerrors.New("worker restarted"), + Code: storiface.ErrTempWorkerRestart, + } // TODO: Handle restarting PC1 once support is merged @@ -166,15 +169,15 @@ const ( // in: func(WorkerReturn, context.Context, CallID, err string) // in: func(WorkerReturn, context.Context, CallID, ret T, err string) -func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error { +func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error { rf := reflect.ValueOf(in) ft := rf.Type() withRet := ft.NumIn() == 5 - return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err error) error { + return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err *storiface.CallError) error { rctx := reflect.ValueOf(ctx) rwr := reflect.ValueOf(wr) - rerr := reflect.ValueOf(errstr(err)) + rerr := reflect.ValueOf(err) rci := reflect.ValueOf(ci) var ro []reflect.Value @@ -198,7 +201,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor } } -var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{ +var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{ AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece), SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1), SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2), @@ -245,7 +248,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r } } - if doReturn(ctx, rt, ci, l.ret, res, err) { + if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) { if err := l.ct.onReturned(ci); err != nil { log.Errorf("tracking call (done): %+v", err) } @@ -255,8 +258,20 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r return ci, nil } +func toCallError(err error) *storiface.CallError { + var serr *storiface.CallError + if err != nil && !xerrors.As(err, &serr) { + serr = &storiface.CallError{ + Sub: err, + Code: storiface.ErrUnknown, + } + } + + return serr +} + // doReturn tries to send the result to manager, returns true if successful -func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr error) bool { +func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr *storiface.CallError) bool { for { err := returnFunc[rt](ctx, ci, ret, res, rerr) if err == nil { @@ -279,14 +294,6 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori return true } -func errstr(err error) string { - if err != nil { - return err.Error() - } - - return "" -} - func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error { sb, err := l.executor() if err != nil {