diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 9733fc76f..ec027b4e2 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -92,13 +92,12 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf for _, call := range unfinished { err := xerrors.Errorf("worker restarted") - if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil { - log.Errorf("return error: %s: %+v", call.RetType, err) - continue - } + // TODO: Handle restarting PC1 once support is merged - if err := w.ct.onReturned(call.ID); err != nil { - log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err) + if doReturn(context.TODO(), call.RetType, call.ID, ret, nil, err) { + if err := w.ct.onReturned(call.ID); err != nil { + log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err) + } } } }() @@ -231,22 +230,42 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret log.Errorf("tracking call (done): %+v", err) } } - } - if err := returnFunc[rt](ctx, ci, l.ret, res, err); err != nil { - log.Errorf("return error: %s: %+v", rt, err) - return - } - - if err := l.ct.onReturned(ci); err != nil { - log.Errorf("tracking call (done): %+v", err) + if doReturn(ctx, rt, ci, l.ret, res, err) { + if err := l.ct.onReturned(ci); err != nil { + log.Errorf("tracking call (done): %+v", err) + } } }() return ci, nil } +// doReturn tries to send the result to manager, returns true if successful +func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr error) bool { + for { + err := returnFunc[rt](ctx, ci, ret, res, rerr) + if err == nil { + break + } + + log.Errorf("return error, will retry in 5s: %s: %+v", rt, err) + select { + case <-time.After(5 * time.Second): + case <-ctx.Done(): + log.Errorf("failed to return results: %s", ctx.Err()) + + // fine to just return, worker is most likely shutting down, and + // we didn't mark the result as returned yet, so we'll try to + // re-submit it on restart + return false + } + } + + return true +} + func errstr(err error) string { if err != nil { return err.Error()