Merge pull request #4645 from filecoin-project/fix/miner-restart-edgecase

sectorstorage: Fix manager restart edge-case
This commit is contained in:
Łukasz Magiera 2020-10-29 18:07:18 +01:00 committed by GitHub
commit dc5d6316c9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 118 additions and 121 deletions

View File

@ -251,19 +251,7 @@ func (m *Manager) waitWork(ctx context.Context, wid WorkID) (interface{}, error)
return nil, xerrors.Errorf("something else in waiting on callRes")
}
ch, ok := m.waitRes[wid]
if !ok {
ch = make(chan struct{})
m.waitRes[wid] = ch
}
m.workLk.Unlock()
select {
case <-ch:
m.workLk.Lock()
defer m.workLk.Unlock()
res := m.results[wid]
done := func() {
delete(m.results, wid)
_, ok := m.callToWork[ws.WorkerCall]
@ -276,6 +264,32 @@ func (m *Manager) waitWork(ctx context.Context, wid WorkID) (interface{}, error)
// Not great, but not worth discarding potentially multi-hour computation over this
log.Errorf("marking work as done: %+v", err)
}
}
// the result can already be there if the work was running, manager restarted,
// and the worker has delivered the result before we entered waitWork
res, ok := m.results[wid]
if ok {
done()
m.workLk.Unlock()
return res.r, res.err
}
ch, ok := m.waitRes[wid]
if !ok {
ch = make(chan struct{})
m.waitRes[wid] = ch
}
m.workLk.Unlock()
select {
case <-ch:
m.workLk.Lock()
defer m.workLk.Unlock()
res := m.results[wid]
done()
return res.r, res.err
case <-ctx.Done():

View File

@ -210,76 +210,100 @@ func TestRedoPC1(t *testing.T) {
// Manager restarts in the middle of a task, restarts it, it completes
func TestRestartManager(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
test := func(returnBeforeCall bool) func(*testing.T) {
return func(t *testing.T) {
logging.SetAllLoggers(logging.LevelDebug)
ctx, done := context.WithCancel(context.Background())
defer done()
ctx, done := context.WithCancel(context.Background())
defer done()
ds := datastore.NewMapDatastore()
ds := datastore.NewMapDatastore()
m, lstor, _, _, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
m, lstor, _, _, cleanup := newTestMgr(ctx, t, ds)
defer cleanup()
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
localTasks := []sealtasks.TaskType{
sealtasks.TTAddPiece, sealtasks.TTPreCommit1, sealtasks.TTCommit1, sealtasks.TTFinalize, sealtasks.TTFetch,
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)
pieces := []abi.PieceInfo{pi, piz}
ticket := abi.SealRandomness{0, 9, 9, 9, 9, 9, 9, 9}
tw.pc1lk.Lock()
tw.pc1wait = &sync.WaitGroup{}
tw.pc1wait.Add(1)
var cwg sync.WaitGroup
cwg.Add(1)
var perr error
go func() {
defer cwg.Done()
_, perr = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()
tw.pc1wait.Wait()
require.NoError(t, m.Close(ctx))
tw.ret = nil
cwg.Wait()
require.Error(t, perr)
m, _, _, _, cleanup2 := newTestMgr(ctx, t, ds)
defer cleanup2()
tw.ret = m // simulate jsonrpc auto-reconnect
err = m.AddWorker(ctx, tw)
require.NoError(t, err)
if returnBeforeCall {
tw.pc1lk.Unlock()
time.Sleep(100 * time.Millisecond)
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
} else {
done := make(chan struct{})
go func() {
defer close(done)
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()
time.Sleep(100 * time.Millisecond)
tw.pc1lk.Unlock()
<-done
}
require.NoError(t, err)
require.Equal(t, 1, tw.pc1s)
ws := m.WorkerJobs()
require.Empty(t, ws)
}
}
tw := newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor, m)
err := m.AddWorker(ctx, tw)
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}
pi, err := m.AddPiece(ctx, sid, nil, 1016, strings.NewReader(strings.Repeat("testthis", 127)))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), pi.Size)
piz, err := m.AddPiece(ctx, sid, nil, 1016, bytes.NewReader(make([]byte, 1016)[:]))
require.NoError(t, err)
require.Equal(t, abi.PaddedPieceSize(1024), piz.Size)
pieces := []abi.PieceInfo{pi, piz}
ticket := abi.SealRandomness{0, 9, 9, 9, 9, 9, 9, 9}
tw.pc1lk.Lock()
tw.pc1wait = &sync.WaitGroup{}
tw.pc1wait.Add(1)
var cwg sync.WaitGroup
cwg.Add(1)
var perr error
go func() {
defer cwg.Done()
_, perr = m.SealPreCommit1(ctx, sid, ticket, pieces)
}()
tw.pc1wait.Wait()
require.NoError(t, m.Close(ctx))
tw.ret = nil
cwg.Wait()
require.Error(t, perr)
m, _, _, _, cleanup2 := newTestMgr(ctx, t, ds)
defer cleanup2()
tw.ret = m // simulate jsonrpc auto-reconnect
err = m.AddWorker(ctx, tw)
require.NoError(t, err)
tw.pc1lk.Unlock()
_, err = m.SealPreCommit1(ctx, sid, ticket, pieces)
require.NoError(t, err)
require.Equal(t, 1, tw.pc1s)
t.Run("callThenReturn", test(false))
t.Run("returnThenCall", test(true))
}
// Worker restarts in the middle of a task, task fails after restart

View File

@ -2,14 +2,11 @@ package sectorstorage
import (
"context"
"io"
"sync"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/go-state-types/abi"
"github.com/filecoin-project/specs-storage/storage"
"github.com/google/uuid"
"github.com/filecoin-project/lotus/extern/sector-storage/mock"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
@ -29,6 +26,8 @@ type testWorker struct {
pc1wait *sync.WaitGroup
session uuid.UUID
Worker
}
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
@ -64,18 +63,6 @@ func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallI
return ci, nil
}
func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
@ -103,34 +90,6 @@ func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ti
})
}
func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil {