diff --git a/manager.go b/manager.go index 0c18645ac..0cd081d92 100644 --- a/manager.go +++ b/manager.go @@ -208,7 +208,7 @@ func (m *Manager) ReadPiece(ctx context.Context, sink io.Writer, sector abi.Sect var selector WorkerSelector if len(best) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // append to existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -269,7 +269,7 @@ func (m *Manager) AddPiece(ctx context.Context, sector abi.SectorID, existingPie var selector WorkerSelector var err error if len(existingPieces) == 0 { // new - selector, err = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) + selector = newAllocSelector(ctx, m.index, stores.FTUnsealed, stores.PathSealing) } else { // use existing selector, err = newExistingSelector(ctx, m.index, sector, stores.FTUnsealed, false) } @@ -300,10 +300,7 @@ func (m *Manager) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticke // TODO: also consider where the unsealed data sits - selector, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) - if err != nil { - return nil, xerrors.Errorf("creating path selector: %w", err) - } + selector := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathSealing) err = m.sched.Schedule(ctx, sector, sealtasks.TTPreCommit1, selector, schedFetch(sector, stores.FTUnsealed, stores.PathSealing, stores.AcquireMove), func(ctx context.Context, w Worker) error { p, err := w.SealPreCommit1(ctx, sector, ticket, pieces) @@ -417,11 +414,7 @@ func (m *Manager) FinalizeSector(ctx context.Context, sector abi.SectorID, keepU return err } - fetchSel, err := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) - if err != nil { - return xerrors.Errorf("creating fetchSel: %w", err) - } - + fetchSel := newAllocSelector(ctx, m.index, stores.FTCache|stores.FTSealed, stores.PathStorage) moveUnsealed := unsealed { if len(keepUnsealed) == 0 { diff --git a/sched_test.go b/sched_test.go index d0d0e7ca9..e810b6a0d 100644 --- a/sched_test.go +++ b/sched_test.go @@ -2,9 +2,21 @@ package sectorstorage import ( "context" + "io" + "sync" "testing" + "time" + "github.com/ipfs/go-cid" "github.com/stretchr/testify/require" + + "github.com/filecoin-project/specs-actors/actors/abi" + + "github.com/filecoin-project/sector-storage/fsutil" + "github.com/filecoin-project/sector-storage/sealtasks" + "github.com/filecoin-project/sector-storage/stores" + "github.com/filecoin-project/sector-storage/storiface" + "github.com/filecoin-project/specs-storage/storage" ) func TestWithPriority(t *testing.T) { @@ -16,3 +28,399 @@ func TestWithPriority(t *testing.T) { require.Equal(t, 2222, getPriority(ctx)) } + +type schedTestWorker struct { + name string + taskTypes map[sealtasks.TaskType]struct{} + paths []stores.StoragePath + + closed bool + closing chan struct{} +} + +func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storage.PreCommit1Out, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector abi.SectorID, pc1o storage.PreCommit1Out) (storage.SectorCids, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealCommit1(ctx context.Context, sector abi.SectorID, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storage.Commit1Out, error) { + panic("implement me") +} + +func (s *schedTestWorker) SealCommit2(ctx context.Context, sector abi.SectorID, c1o storage.Commit1Out) (storage.Proof, error) { + panic("implement me") +} + +func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector abi.SectorID, keepUnsealed []storage.Range) error { + panic("implement me") +} + +func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector abi.SectorID, safeToFree []storage.Range) error { + panic("implement me") +} + +func (s *schedTestWorker) Remove(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) NewSector(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) AddPiece(ctx context.Context, sector abi.SectorID, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (abi.PieceInfo, error) { + panic("implement me") +} + +func (s *schedTestWorker) MoveStorage(ctx context.Context, sector abi.SectorID) error { + panic("implement me") +} + +func (s *schedTestWorker) Fetch(ctx context.Context, id abi.SectorID, ft stores.SectorFileType, ptype stores.PathType, am stores.AcquireMode) error { + panic("implement me") +} + +func (s *schedTestWorker) UnsealPiece(ctx context.Context, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) error { + panic("implement me") +} + +func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id abi.SectorID, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) error { + panic("implement me") +} + +func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) { + return s.taskTypes, nil +} + +func (s *schedTestWorker) Paths(ctx context.Context) ([]stores.StoragePath, error) { + return s.paths, nil +} + +func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) { + return storiface.WorkerInfo{ + Hostname: s.name, + Resources: storiface.WorkerResources{ + MemPhysical: 128 << 30, + MemSwap: 200 << 30, + MemReserved: 2 << 30, + CPUs: 32, + GPUs: []string{"a GPU"}, + }, + }, nil +} + +func (s *schedTestWorker) Closing(ctx context.Context) (<-chan struct{}, error) { + return s.closing, nil +} + +func (s *schedTestWorker) Close() error { + if !s.closed { + s.closed = true + close(s.closing) + } + return nil +} + +var _ Worker = &schedTestWorker{} + +func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}) { + w := &schedTestWorker{ + name: name, + taskTypes: taskTypes, + paths: []stores.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "food", CanSeal: true, CanStore: true}}, + + closing: make(chan struct{}), + } + + for _, path := range w.paths { + err := index.StorageAttach(context.TODO(), stores.StorageInfo{ + ID: path.ID, + URLs: nil, + Weight: path.Weight, + CanSeal: path.CanSeal, + CanStore: path.CanStore, + }, fsutil.FsStat{ + Capacity: 1 << 40, + Available: 1 << 40, + Reserved: 3, + }) + require.NoError(t, err) + } + + info, err := w.Info(context.TODO()) + require.NoError(t, err) + + sched.newWorkers <- &workerHandle{ + w: w, + info: info, + preparing: &activeResources{}, + active: &activeResources{}, + } +} + +func TestSchedStartStop(t *testing.T) { + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + sched := newScheduler(spt) + go sched.runSched() + + addTestWorker(t, sched, stores.NewIndex(), "fred", nil) + + sched.schedClose() +} + +func TestSched(t *testing.T) { + ctx := context.Background() + spt := abi.RegisteredSealProof_StackedDrg32GiBV1 + + sectorAte := abi.SectorID{ + Miner: 8, + Number: 8, + } + + type workerSpec struct { + name string + taskTypes map[sealtasks.TaskType]struct{} + } + + noopPrepare := func(ctx context.Context, w Worker) error { + return nil + } + + type runMeta struct { + done map[string]chan struct{} + + wg sync.WaitGroup + } + + type task func(*testing.T, *scheduler, *stores.Index, *runMeta) + + sched := func(taskName, expectWorker string, taskType sealtasks.TaskType) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + done := make(chan struct{}) + rm.done[taskName] = done + + sel := newAllocSelector(ctx, index, stores.FTCache, stores.PathSealing) + + rm.wg.Add(1) + go func() { + defer rm.wg.Done() + + err := sched.Schedule(ctx, sectorAte, taskType, sel, noopPrepare, func(ctx context.Context, w Worker) error { + wi, err := w.Info(ctx) + require.NoError(t, err) + + require.Equal(t, expectWorker, wi.Hostname) + + log.Info("IN ", taskName) + + for { + _, ok := <-done + if !ok { + break + } + } + + log.Info("OUT ", taskName) + + return nil + }) + require.NoError(t, err) + }() + } + } + + taskStarted := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + rm.done[name] <- struct{}{} + } + } + + taskDone := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + rm.done[name] <- struct{}{} + close(rm.done[name]) + } + } + + taskNotScheduled := func(name string) task { + return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) { + select { + case rm.done[name] <- struct{}{}: + t.Fatal("not expected") + case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy + } + } + } + + testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) { + return func(t *testing.T) { + index := stores.NewIndex() + + sched := newScheduler(spt) + go sched.runSched() + + for _, worker := range workers { + addTestWorker(t, sched, index, worker.name, worker.taskTypes) + } + + rm := runMeta{ + done: map[string]chan struct{}{}, + } + + for _, task := range tasks { + task(t, sched, index, &rm) + } + + log.Info("wait for async stuff") + rm.wg.Wait() + + sched.schedClose() + } + } + + multTask := func(tasks ...task) task { + return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) { + for _, tsk := range tasks { + tsk(t, s, index, meta) + } + } + } + + t.Run("one-pc1", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("pc1-1", "fred", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-2workers-1", testFunc([]workerSpec{ + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-2workers-2", testFunc([]workerSpec{ + {name: "fred1", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + {name: "fred2", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc1-1", "fred1", sealtasks.TTPreCommit1), + taskDone("pc1-1"), + })) + + t.Run("pc1-block-pc2", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc1", "fred", sealtasks.TTPreCommit1), + taskStarted("pc1"), + + sched("pc2", "fred", sealtasks.TTPreCommit2), + taskNotScheduled("pc2"), + + taskDone("pc1"), + taskDone("pc2"), + })) + + t.Run("pc2-block-pc1", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2:{}}}, + }, []task{ + sched("pc2", "fred", sealtasks.TTPreCommit2), + taskStarted("pc2"), + + sched("pc1", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("pc1"), + + taskDone("pc2"), + taskDone("pc1"), + })) + + t.Run("pc1-batching", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}}}, + }, []task{ + sched("t1", "fred", sealtasks.TTPreCommit1), + taskStarted("t1"), + + sched("t2", "fred", sealtasks.TTPreCommit1), + taskStarted("t2"), + + // with worker settings, we can only run 2 parallel PC1s + + // start 2 more to fill fetch buffer + + sched("t3", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t3"), + + sched("t4", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t4"), + + taskDone("t1"), + taskDone("t2"), + + taskStarted("t3"), + taskStarted("t4"), + + taskDone("t3"), + taskDone("t4"), + })) + + twoPC1 := func(prefix string, schedAssert func(name string) task) task { + return multTask( + sched(prefix + "-a", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix + "-a"), + + sched(prefix + "-b", "fred", sealtasks.TTPreCommit1), + schedAssert(prefix + "-b"), + ) + } + + twoPC1Done := func(prefix string) task { + return multTask( + taskDone(prefix + "-1"), + taskDone(prefix + "-b"), + ) + } + + t.Run("pc1-pc2-prio", testFunc([]workerSpec{ + {name: "fred", taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1:{}, sealtasks.TTPreCommit2: {}}}, + }, []task{ + // fill exec/fetch buffers + twoPC1("w0", taskStarted), + twoPC1("w1", taskNotScheduled), + + // fill worker windows + twoPC1("w2", taskNotScheduled), + twoPC1("w3", taskNotScheduled), + + // windowed + + sched("t1", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t1"), + + sched("t2", "fred", sealtasks.TTPreCommit1), + taskNotScheduled("t2"), + + sched("t3", "fred", sealtasks.TTPreCommit2), + taskNotScheduled("t3"), + + twoPC1Done("w0"), + twoPC1Done("w1"), + twoPC1Done("w2"), + twoPC1Done("w3"), + + taskStarted("t1"), + taskNotScheduled("t2"), + taskNotScheduled("t3"), + + taskDone("t1"), + + taskStarted("t2"), + taskStarted("t3"), + + taskDone("t2"), + taskDone("t3"), + })) +} + diff --git a/selector_alloc.go b/selector_alloc.go index 874bf7bb0..53e121737 100644 --- a/selector_alloc.go +++ b/selector_alloc.go @@ -17,12 +17,12 @@ type allocSelector struct { ptype stores.PathType } -func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector, error) { +func newAllocSelector(ctx context.Context, index stores.SectorIndex, alloc stores.SectorFileType, ptype stores.PathType) (*allocSelector) { return &allocSelector{ index: index, alloc: alloc, ptype: ptype, - }, nil + } } func (s *allocSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, whnd *workerHandle) (bool, error) {