workers: Mark on-restart-failed returned tasks as returned

This commit is contained in:
Łukasz Magiera 2020-09-22 01:00:17 +02:00
parent 03c3d8bdb3
commit b8865fb182
4 changed files with 16 additions and 7 deletions

View File

@ -338,4 +338,9 @@ func TestRestartWorker(t *testing.T) {
require.NoError(t, err) require.NoError(t, err)
<-apDone <-apDone
time.Sleep(12 * time.Millisecond)
uf, err := w.ct.unfinished()
require.NoError(t, err)
require.Empty(t, uf)
} }

View File

@ -15,7 +15,7 @@ import (
) )
type apres struct { type apres struct {
pi abi.PieceInfo pi abi.PieceInfo
err error err error
} }

View File

@ -29,9 +29,9 @@ type Call struct {
func (wt *workerCallTracker) onStart(ci storiface.CallID, rt ReturnType) error { func (wt *workerCallTracker) onStart(ci storiface.CallID, rt ReturnType) error {
return wt.st.Begin(ci, &Call{ return wt.st.Begin(ci, &Call{
ID: ci, ID: ci,
RetType:rt, RetType: rt,
State: CallStarted, State: CallStarted,
}) })
} }

View File

@ -65,7 +65,7 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf
st: cst, st: cst,
}, },
acceptTasks: acceptTasks, acceptTasks: acceptTasks,
executor: executor, executor: executor,
closing: make(chan struct{}), closing: make(chan struct{}),
} }
@ -86,11 +86,15 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf
if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil { if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil {
log.Errorf("return error: %s: %+v", call.RetType, err) log.Errorf("return error: %s: %+v", call.RetType, err)
continue
}
if err := w.ct.onReturned(call.ID); err != nil {
log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err)
} }
} }
}() }()
return w return w
} }