Fix sealing sched tests

This commit is contained in:
Łukasz Magiera 2020-09-07 16:35:54 +02:00
parent 9e6f974f3c
commit 231a9e4051
7 changed files with 120 additions and 35 deletions

View File

@ -344,7 +344,7 @@ var runCmd = &cli.Command{
LocalWorker: sectorstorage.NewLocalWorker(sectorstorage.WorkerConfig{
SealProof: spt,
TaskTypes: taskTypes,
}, remote, localStore, nodeApi),
}, remote, localStore, nodeApi, nodeApi),
localStore: localStore,
ls: lr,
}

View File

@ -39,7 +39,7 @@ type LocalWorker struct {
acceptTasks map[sealtasks.TaskType]struct{}
}
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex) *LocalWorker {
func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local, sindex stores.SectorIndex, ret storiface.WorkerReturn) *LocalWorker {
acceptTasks := map[sealtasks.TaskType]struct{}{}
for _, taskType := range wcfg.TaskTypes {
acceptTasks[taskType] = struct{}{}
@ -52,6 +52,7 @@ func NewLocalWorker(wcfg WorkerConfig, store stores.Store, local *stores.Local,
storage: store,
localStore: local,
sindex: sindex,
ret: ret,
acceptTasks: acceptTasks,
}

View File

@ -119,6 +119,9 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
sched: newScheduler(cfg.SealProofType),
Prover: prover,
results: map[storiface.CallID]result{},
waitRes: map[storiface.CallID]chan struct{}{},
}
go m.sched.runSched()
@ -145,7 +148,7 @@ func New(ctx context.Context, ls stores.LocalStorage, si stores.SectorIndex, cfg
err = m.AddWorker(ctx, NewLocalWorker(WorkerConfig{
SealProof: cfg.SealProofType,
TaskTypes: localTasks,
}, stor, lstor, si))
}, stor, lstor, si, m))
if err != nil {
return nil, xerrors.Errorf("adding local worker: %w", err)
}
@ -528,7 +531,7 @@ func (m *Manager) waitResult(ctx context.Context) func(callID storiface.CallID,
}
}
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, err string) error {
func (m *Manager) returnResult(callID storiface.CallID, r interface{}, serr string) error {
m.resLk.Lock()
defer m.resLk.Unlock()
@ -537,9 +540,14 @@ func (m *Manager) returnResult(callID storiface.CallID, r interface{}, err strin
return xerrors.Errorf("result for call %v already reported")
}
var err error
if serr != "" {
err = errors.New(serr)
}
m.results[callID] = result{
r: r,
err: errors.New(err),
err: err,
}
close(m.waitRes[callID])

View File

@ -15,6 +15,7 @@ import (
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -109,6 +110,9 @@ func newTestMgr(ctx context.Context, t *testing.T) (*Manager, *stores.Local, *st
sched: newScheduler(cfg.SealProofType),
Prover: prover,
results: map[storiface.CallID]result{},
waitRes: map[storiface.CallID]chan struct{}{},
}
go m.sched.runSched()
@ -129,7 +133,7 @@ func TestSimple(t *testing.T) {
err := m.AddWorker(ctx, newTestWorker(WorkerConfig{
SealProof: abi.RegisteredSealProof_StackedDrg2KiBV1,
TaskTypes: localTasks,
}, lstor))
}, lstor, m))
require.NoError(t, err)
sid := abi.SectorID{Miner: 1000, Number: 1}

View File

@ -384,6 +384,50 @@ func (mgr *SectorMgr) CheckProvable(ctx context.Context, spt abi.RegisteredSealP
return bad, nil
}
func (mgr *SectorMgr) ReturnAddPiece(ctx context.Context, callID storiface.CallID, pi abi.PieceInfo, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealPreCommit1(ctx context.Context, callID storiface.CallID, p1o storage.PreCommit1Out, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealPreCommit2(ctx context.Context, callID storiface.CallID, sealed storage.SectorCids, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealCommit1(ctx context.Context, callID storiface.CallID, out storage.Commit1Out, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnSealCommit2(ctx context.Context, callID storiface.CallID, proof storage.Proof, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnFinalizeSector(ctx context.Context, callID storiface.CallID, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnReleaseUnsealed(ctx context.Context, callID storiface.CallID, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnMoveStorage(ctx context.Context, callID storiface.CallID, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnUnsealPiece(ctx context.Context, callID storiface.CallID, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnReadPiece(ctx context.Context, callID storiface.CallID, ok bool, err string) error {
panic("not supported")
}
func (mgr *SectorMgr) ReturnFetch(ctx context.Context, callID storiface.CallID, err string) error {
panic("not supported")
}
func (m mockVerif) VerifySeal(svi abi.SealVerifyInfo) (bool, error) {
if len(svi.Proof) != 32 { // Real ones are longer, but this should be fine
return false, nil

View File

@ -46,55 +46,55 @@ type schedTestWorker struct {
closing chan struct{}
}
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) error {
func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) error {
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error {
func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
panic("implement me")
}

View File

@ -4,6 +4,7 @@ import (
"context"
"io"
"github.com/google/uuid"
"github.com/ipfs/go-cid"
"github.com/filecoin-project/specs-actors/actors/abi"
@ -18,11 +19,12 @@ import (
type testWorker struct {
acceptTasks map[sealtasks.TaskType]struct{}
lstor *stores.Local
ret storiface.WorkerReturn
mockSeal *mock.SectorMgr
}
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local) *testWorker {
func newTestWorker(wcfg WorkerConfig, lstor *stores.Local, ret storiface.WorkerReturn) *testWorker {
ssize, err := wcfg.SealProof.SectorSize()
if err != nil {
panic(err)
@ -36,61 +38,87 @@ func newTestWorker(wcfg WorkerConfig, lstor *stores.Local) *testWorker {
return &testWorker{
acceptTasks: acceptTasks,
lstor: lstor,
ret: ret,
mockSeal: mock.NewMockSectorMgr(ssize, nil),
}
}
func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) {
return t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
func (t *testWorker) asyncCall(sector abi.SectorID, work func(ci storiface.CallID)) (storiface.CallID, error) {
ci := storiface.CallID{
Sector: sector,
ID: uuid.New(),
}
go work(ci)
return ci, nil
}
func (t *testWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p1o, err := t.mockSeal.SealPreCommit1(ctx, sector, ticket, pieces)
if err := t.ret.ReturnSealPreCommit1(ctx, ci, p1o, errstr(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) NewSector(ctx context.Context, sector abi.SectorID) error {
panic("implement me")
}
func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error {
func (t *testWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (bool, error) {
func (t *testWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) {
return t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
func (t *testWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
p, err := t.mockSeal.AddPiece(ctx, sector, pieceSizes, newPieceSize, pieceData)
if err := t.ret.ReturnAddPiece(ctx, ci, p, errstr(err)); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) {
func (t *testWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) {
func (t *testWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) {
func (t *testWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error {
func (t *testWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error {
func (t *testWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) error {
func (t *testWorker) Remove(ctx context.Context, sector abi.SectorID) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) error {
func (t *testWorker) MoveStorage(ctx context.Context, sector abi.SectorID, types storiface.SectorFileType) (storiface.CallID, error) {
panic("implement me")
}
func (t *testWorker) Fetch(ctx context.Context, id abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) error {
return nil
func (t *testWorker) Fetch(ctx context.Context, sector abi.SectorID, fileType storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
return t.asyncCall(sector, func(ci storiface.CallID) {
if err := t.ret.ReturnFetch(ctx, ci, ""); err != nil {
log.Error(err)
}
})
}
func (t *testWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {