From b8865fb182f700b04589d76e2324fdf90c131c43 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 22 Sep 2020 01:00:17 +0200 Subject: [PATCH] workers: Mark on-restart-failed returned tasks as returned --- extern/sector-storage/manager_test.go | 5 +++++ extern/sector-storage/teststorage_test.go | 4 ++-- extern/sector-storage/worker_calltracker.go | 6 +++--- extern/sector-storage/worker_local.go | 8 ++++++-- 4 files changed, 16 insertions(+), 7 deletions(-) diff --git a/extern/sector-storage/manager_test.go b/extern/sector-storage/manager_test.go index 9a47c3b55..d87ec0827 100644 --- a/extern/sector-storage/manager_test.go +++ b/extern/sector-storage/manager_test.go @@ -338,4 +338,9 @@ func TestRestartWorker(t *testing.T) { require.NoError(t, err) <-apDone + + time.Sleep(12 * time.Millisecond) + uf, err := w.ct.unfinished() + require.NoError(t, err) + require.Empty(t, uf) } diff --git a/extern/sector-storage/teststorage_test.go b/extern/sector-storage/teststorage_test.go index da575a491..0c8a240a3 100644 --- a/extern/sector-storage/teststorage_test.go +++ b/extern/sector-storage/teststorage_test.go @@ -15,7 +15,7 @@ import ( ) type apres struct { - pi abi.PieceInfo + pi abi.PieceInfo err error } @@ -78,4 +78,4 @@ func (t *testExec) ReadPiece(ctx context.Context, writer io.Writer, sector abi.S panic("implement me") } -var _ ffiwrapper.Storage = &testExec{} \ No newline at end of file +var _ ffiwrapper.Storage = &testExec{} diff --git a/extern/sector-storage/worker_calltracker.go b/extern/sector-storage/worker_calltracker.go index 38fb39ee1..1033822a5 100644 --- a/extern/sector-storage/worker_calltracker.go +++ b/extern/sector-storage/worker_calltracker.go @@ -29,9 +29,9 @@ type Call struct { func (wt *workerCallTracker) onStart(ci storiface.CallID, rt ReturnType) error { return wt.st.Begin(ci, &Call{ - ID: ci, - RetType:rt, - State: CallStarted, + ID: ci, + RetType: rt, + State: CallStarted, }) } diff --git a/extern/sector-storage/worker_local.go b/extern/sector-storage/worker_local.go index 009e11921..38b41ceb4 100644 --- a/extern/sector-storage/worker_local.go +++ b/extern/sector-storage/worker_local.go @@ -65,7 +65,7 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf st: cst, }, acceptTasks: acceptTasks, - executor: executor, + executor: executor, closing: make(chan struct{}), } @@ -86,11 +86,15 @@ func newLocalWorker(executor func() (ffiwrapper.Storage, error), wcfg WorkerConf if err := returnFunc[call.RetType](context.TODO(), call.ID, ret, nil, err); err != nil { log.Errorf("return error: %s: %+v", call.RetType, err) + continue + } + + if err := w.ct.onReturned(call.ID); err != nil { + log.Errorf("marking call as returned failed: %s: %+v", call.RetType, err) } } }() - return w }