Add error codes to worker return

This commit is contained in:
Łukasz Magiera 2020-11-17 16:17:45 +01:00
parent 425ad9c5fa
commit b8853aa4d5
8 changed files with 123 additions and 86 deletions

View File

@ -311,17 +311,17 @@ type StorageMinerStruct struct {
WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"` WorkerStats func(context.Context) (map[uuid.UUID]storiface.WorkerStats, error) `perm:"admin"`
WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"` WorkerJobs func(context.Context) (map[uuid.UUID][]storiface.WorkerJob, error) `perm:"admin"`
ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error `perm:"admin" retry:"true"` ReturnAddPiece func(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error `perm:"admin" retry:"true"` ReturnSealPreCommit1 func(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error `perm:"admin" retry:"true"` ReturnSealPreCommit2 func(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error `perm:"admin" retry:"true"` ReturnSealCommit1 func(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error `perm:"admin" retry:"true"` ReturnSealCommit2 func(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnFinalizeSector func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnReleaseUnsealed func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnMoveStorage func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnUnsealPiece func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err string) error `perm:"admin" retry:"true"` ReturnReadPiece func(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error `perm:"admin" retry:"true"`
ReturnFetch func(ctx context.Context, callID storiface.CallID, err string) error `perm:"admin" retry:"true"` ReturnFetch func(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error `perm:"admin" retry:"true"`
SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"` SealingSchedDiag func(context.Context, bool) (interface{}, error) `perm:"admin"`
SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"` SealingAbort func(ctx context.Context, call storiface.CallID) error `perm:"admin"`
@ -1271,47 +1271,47 @@ func (c *StorageMinerStruct) WorkerJobs(ctx context.Context) (map[uuid.UUID][]st
return c.Internal.WorkerJobs(ctx) return c.Internal.WorkerJobs(ctx)
} }
func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { func (c *StorageMinerStruct) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return c.Internal.ReturnAddPiece(ctx, callID, pi, err) return c.Internal.ReturnAddPiece(ctx, callID, pi, err)
} }
func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { func (c *StorageMinerStruct) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return c.Internal.ReturnSealPreCommit1(ctx, callID, p1o, err) return c.Internal.ReturnSealPreCommit1(ctx, callID, p1o, err)
} }
func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { func (c *StorageMinerStruct) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return c.Internal.ReturnSealPreCommit2(ctx, callID, sealed, err) return c.Internal.ReturnSealPreCommit2(ctx, callID, sealed, err)
} }
func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { func (c *StorageMinerStruct) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return c.Internal.ReturnSealCommit1(ctx, callID, out, err) return c.Internal.ReturnSealCommit1(ctx, callID, out, err)
} }
func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { func (c *StorageMinerStruct) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return c.Internal.ReturnSealCommit2(ctx, callID, proof, err) return c.Internal.ReturnSealCommit2(ctx, callID, proof, err)
} }
func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { func (c *StorageMinerStruct) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnFinalizeSector(ctx, callID, err) return c.Internal.ReturnFinalizeSector(ctx, callID, err)
} }
func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { func (c *StorageMinerStruct) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnReleaseUnsealed(ctx, callID, err) return c.Internal.ReturnReleaseUnsealed(ctx, callID, err)
} }
func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { func (c *StorageMinerStruct) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnMoveStorage(ctx, callID, err) return c.Internal.ReturnMoveStorage(ctx, callID, err)
} }
func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { func (c *StorageMinerStruct) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnUnsealPiece(ctx, callID, err) return c.Internal.ReturnUnsealPiece(ctx, callID, err)
} }
func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { func (c *StorageMinerStruct) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return c.Internal.ReturnReadPiece(ctx, callID, ok, err) return c.Internal.ReturnReadPiece(ctx, callID, ok, err)
} }
func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { func (c *StorageMinerStruct) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return c.Internal.ReturnFetch(ctx, callID, err) return c.Internal.ReturnFetch(ctx, callID, err)
} }

View File

@ -617,47 +617,47 @@ func (m *Manager) Remove(ctx context.Context, sector storage.SectorRef) error {
return err return err
} }
func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { func (m *Manager) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
return m.returnResult(callID, pi, err) return m.returnResult(callID, pi, err)
} }
func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { func (m *Manager) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
return m.returnResult(callID, p1o, err) return m.returnResult(callID, p1o, err)
} }
func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { func (m *Manager) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
return m.returnResult(callID, sealed, err) return m.returnResult(callID, sealed, err)
} }
func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { func (m *Manager) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
return m.returnResult(callID, out, err) return m.returnResult(callID, out, err)
} }
func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { func (m *Manager) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
return m.returnResult(callID, proof, err) return m.returnResult(callID, proof, err)
} }
func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { func (m *Manager) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(callID, nil, err)
} }
func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { func (m *Manager) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(callID, nil, err)
} }
func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { func (m *Manager) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(callID, nil, err)
} }
func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { func (m *Manager) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(callID, nil, err)
} }
func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { func (m *Manager) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
return m.returnResult(callID, ok, err) return m.returnResult(callID, ok, err)
} }
func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { func (m *Manager) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
return m.returnResult(callID, nil, err) return m.returnResult(callID, nil, err)
} }

View File

@ -5,7 +5,6 @@ import (
"crypto/sha256" "crypto/sha256"
"encoding/hex" "encoding/hex"
"encoding/json" "encoding/json"
"errors"
"fmt" "fmt"
"os" "os"
"time" "time"
@ -350,15 +349,12 @@ func (m *Manager) waitCall(ctx context.Context, callID storiface.CallID) (interf
} }
} }
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error { func (m *Manager) returnResult(callID storiface.CallID, r interface{}, cerr *storiface.CallError) error {
var err error
if serr != "" {
err = errors.New(serr)
}
res := result{ res := result{
r: r, r: r,
err: err, }
if cerr != nil {
res.err = cerr
} }
m.sched.workTracker.onDone(callID) m.sched.workTracker.onDone(callID)
@ -392,7 +388,7 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
m.results[wid] = res m.results[wid] = res
err = m.work.Get(wid).Mutate(func(ws *WorkState) error { err := m.work.Get(wid).Mutate(func(ws *WorkState) error {
ws.Status = wsDone ws.Status = wsDone
return nil return nil
}) })
@ -416,5 +412,6 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr stri
} }
func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error { func (m *Manager) Abort(ctx context.Context, call storiface.CallID) error {
return m.returnResult(call, nil, "task aborted") // TODO: Allow temp error
return m.returnResult(call, nil, storiface.Err(storiface.ErrUnknown, xerrors.New("task aborted")))
} }

View File

@ -418,47 +418,47 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, pp abi.RegisteredPoStPr
return bad, nil return bad, nil
} }
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error { func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error { func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error { func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error { func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error { func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error { func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error { func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error { func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error { func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error { func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }
func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error { func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err *storiface.CallError) error {
panic("not supported") panic("not supported")
} }

View File

@ -361,7 +361,7 @@ func (st *Local) Reserve(ctx context.Context, sid storage.SectorRef, ft storifac
overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen overhead := int64(overheadTab[fileType]) * int64(ssize) / storiface.FSOverheadDen
if stat.Available < overhead { if stat.Available < overhead {
return nil, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available) return nil, storiface.Err(storiface.ErrTempAllocateSpace, xerrors.Errorf("can't reserve %d bytes in '%s' (id:%s), only %d available", overhead, p.local, id, stat.Available))
} }
p.reserved += overhead p.reserved += overhead

View File

@ -90,16 +90,49 @@ type WorkerCalls interface {
Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error) Fetch(context.Context, storage.SectorRef, SectorFileType, PathType, AcquireMode) (CallID, error)
} }
type WorkerReturn interface { type ErrorCode int
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err string) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err string) error const (
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err string) error ErrUnknown ErrorCode = iota
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err string) error )
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err string) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err string) error const (
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err string) error // Temp Errors
ReturnMoveStorage(ctx context.Context, callID CallID, err string) error ErrTempUnknown ErrorCode = iota + 100
ReturnUnsealPiece(ctx context.Context, callID CallID, err string) error ErrTempWorkerRestart
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err string) error ErrTempAllocateSpace
ReturnFetch(ctx context.Context, callID CallID, err string) error )
type CallError struct {
Code ErrorCode
Sub error
}
func (c *CallError) Error() string {
return fmt.Sprintf("storage call error %d: %s", c.Code, c.Sub.Error())
}
func (c *CallError) Unwrap() error {
return c.Sub
}
func Err(code ErrorCode, sub error) *CallError {
return &CallError{
Code: code,
Sub: sub,
}
}
type WorkerReturn interface {
ReturnAddPiece(ctx context.Context, callID CallID, pi abi.PieceInfo, err *CallError) error
ReturnSealPreCommit1(ctx context.Context, callID CallID, p1o storage.PreCommit1Out, err *CallError) error
ReturnSealPreCommit2(ctx context.Context, callID CallID, sealed storage.SectorCids, err *CallError) error
ReturnSealCommit1(ctx context.Context, callID CallID, out storage.Commit1Out, err *CallError) error
ReturnSealCommit2(ctx context.Context, callID CallID, proof storage.Proof, err *CallError) error
ReturnFinalizeSector(ctx context.Context, callID CallID, err *CallError) error
ReturnReleaseUnsealed(ctx context.Context, callID CallID, err *CallError) error
ReturnMoveStorage(ctx context.Context, callID CallID, err *CallError) error
ReturnUnsealPiece(ctx context.Context, callID CallID, err *CallError) error
ReturnReadPiece(ctx context.Context, callID CallID, ok bool, err *CallError) error
ReturnFetch(ctx context.Context, callID CallID, err *CallError) error
} }

View File

@ -61,7 +61,7 @@ func (t *testWorker) asyncCall(sector storage.SectorRef, work func(ci storiface.
func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) { func (t *testWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) { return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData) p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
if err := t.ret.ReturnAddPiece(ctx, ci, p, errstr(err)); err != nil { if err := t.ret.ReturnAddPiece(ctx, ci, p, toCallError(err)); err != nil {
log.Error(err) log.Error(err)
} }
}) })
@ -79,7 +79,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe
defer t.pc1lk.Unlock() defer t.pc1lk.Unlock()
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces) p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil { if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, toCallError(err)); err != nil {
log.Error(err) log.Error(err)
} }
}) })
@ -87,7 +87,7 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRe
func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) { func (t *testWorker) Fetch(ctx context.Context, sector storage.SectorRef, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) { return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil { if err := t.ret.ReturnFetch(ctx, ci, nil); err != nil {
log.Error(err) log.Error(err)
} }
}) })

View File

@ -90,7 +90,10 @@ func newLocalWorker(executor ExecutorFunc, wcfg WorkerConfig, store stores.Store
go func() { go func() {
for _, call := range unfinished { for _, call := range unfinished {
err := xerrors.Errorf("worker restarted") err := &storiface.CallError{
Sub: xerrors.New("worker restarted"),
Code: storiface.ErrTempWorkerRestart,
}
// TODO: Handle restarting PC1 once support is merged // TODO: Handle restarting PC1 once support is merged
@ -166,15 +169,15 @@ const (
// in: func(WorkerReturn, context.Context, CallID, err string) // in: func(WorkerReturn, context.Context, CallID, err string)
// in: func(WorkerReturn, context.Context, CallID, ret T, err string) // in: func(WorkerReturn, context.Context, CallID, ret T, err string)
func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error { func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error {
rf := reflect.ValueOf(in) rf := reflect.ValueOf(in)
ft := rf.Type() ft := rf.Type()
withRet := ft.NumIn() == 5 withRet := ft.NumIn() == 5
return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err error) error { return func(ctx context.Context, ci storiface.CallID, wr storiface.WorkerReturn, i interface{}, err *storiface.CallError) error {
rctx := reflect.ValueOf(ctx) rctx := reflect.ValueOf(ctx)
rwr := reflect.ValueOf(wr) rwr := reflect.ValueOf(wr)
rerr := reflect.ValueOf(errstr(err)) rerr := reflect.ValueOf(err)
rci := reflect.ValueOf(ci) rci := reflect.ValueOf(ci)
var ro []reflect.Value var ro []reflect.Value
@ -198,7 +201,7 @@ func rfunc(in interface{}) func(context.Context, storiface.CallID, storiface.Wor
} }
} }
var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, error) error{ var returnFunc = map[ReturnType]func(context.Context, storiface.CallID, storiface.WorkerReturn, interface{}, *storiface.CallError) error{
AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece), AddPiece: rfunc(storiface.WorkerReturn.ReturnAddPiece),
SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1), SealPreCommit1: rfunc(storiface.WorkerReturn.ReturnSealPreCommit1),
SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2), SealPreCommit2: rfunc(storiface.WorkerReturn.ReturnSealPreCommit2),
@ -245,7 +248,7 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
} }
} }
if doReturn(ctx, rt, ci, l.ret, res, err) { if doReturn(ctx, rt, ci, l.ret, res, toCallError(err)) {
if err := l.ct.onReturned(ci); err != nil { if err := l.ct.onReturned(ci); err != nil {
log.Errorf("tracking call (done): %+v", err) log.Errorf("tracking call (done): %+v", err)
} }
@ -255,8 +258,20 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector storage.SectorRef, r
return ci, nil return ci, nil
} }
func toCallError(err error) *storiface.CallError {
var serr *storiface.CallError
if err != nil && !xerrors.As(err, &serr) {
serr = &storiface.CallError{
Sub: err,
Code: storiface.ErrUnknown,
}
}
return serr
}
// doReturn tries to send the result to manager, returns true if successful // doReturn tries to send the result to manager, returns true if successful
func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr error) bool { func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr *storiface.CallError) bool {
for { for {
err := returnFunc[rt](ctx, ci, ret, res, rerr) err := returnFunc[rt](ctx, ci, ret, res, rerr)
if err == nil { if err == nil {
@ -279,14 +294,6 @@ func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret stori
return true return true
} }
func errstr(err error) string {
if err != nil {
return err.Error()
}
return ""
}
func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error { func (l *LocalWorker) NewSector(ctx context.Context, sector storage.SectorRef) error {
sb, err := l.executor() sb, err := l.executor()
if err != nil { if err != nil {