localworker: Try very hard to get ruselts to manager
This commit is contained in:
parent
dbb421c4f7
commit
8c86ea6b75
47
extern/sector-storage/worker_local.go
vendored
47
extern/sector-storage/worker_local.go
vendored
@ -92,13 +92,12 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf
|
||||
for _, call := range unfinished {
|
||||
err := xerrors.Errorf("worker restarted")
|
||||
|
||||
if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil {
|
||||
log.Errorf("return error: %s: %+v", call.RetType, err)
|
||||
continue
|
||||
}
|
||||
// TODO: Handle restarting PC1 once support is merged
|
||||
|
||||
if err := w.ct.onReturned(call.ID); err != nil {
|
||||
log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err)
|
||||
if doReturn(context.TODO(), call.RetType, call.ID, ret, nil, err) {
|
||||
if err := w.ct.onReturned(call.ID); err != nil {
|
||||
log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
@ -231,22 +230,42 @@ func (l *LocalWorker) asyncCall(ctx context.Context, sector abi.SectorID, rt Ret
|
||||
log.Errorf("tracking call (done): %+v", err)
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
if err := returnFunc[rt](ctx, ci, l.ret, res, err); err != nil {
|
||||
log.Errorf("return error: %s: %+v", rt, err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := l.ct.onReturned(ci); err != nil {
|
||||
log.Errorf("tracking call (done): %+v", err)
|
||||
if doReturn(ctx, rt, ci, l.ret, res, err) {
|
||||
if err := l.ct.onReturned(ci); err != nil {
|
||||
log.Errorf("tracking call (done): %+v", err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
return ci, nil
|
||||
}
|
||||
|
||||
// doReturn tries to send the result to manager, returns true if successful
|
||||
func doReturn(ctx context.Context, rt ReturnType, ci storiface.CallID, ret storiface.WorkerReturn, res interface{}, rerr error) bool {
|
||||
for {
|
||||
err := returnFunc[rt](ctx, ci, ret, res, rerr)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
log.Errorf("return error, will retry in 5s: %s: %+v", rt, err)
|
||||
select {
|
||||
case <-time.After(5 * time.Second):
|
||||
case <-ctx.Done():
|
||||
log.Errorf("failed to return results: %s", ctx.Err())
|
||||
|
||||
// fine to just return, worker is most likely shutting down, and
|
||||
// we didn't mark the result as returned yet, so we'll try to
|
||||
// re-submit it on restart
|
||||
return false
|
||||
}
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
func errstr(err error) string {
|
||||
if err != nil {
|
||||
return err.Error()
|
||||
|
Loading…
Reference in New Issue
Block a user