lotus/extern/sector-storage/sched_test.go

719 lines
21 KiB
Go
Raw Normal View History

2021-12-15 14:30:42 +00:00
//stm: #unit
package sectorstorage
2020-06-24 21:06:56 +00:00
import (
"context"
2020-07-16 23:26:55 +00:00
"fmt"
2020-07-16 21:41:04 +00:00
"io"
2020-07-16 23:26:55 +00:00
"runtime"
"sort"
2020-07-16 21:41:04 +00:00
"sync"
2020-06-24 21:06:56 +00:00
"testing"
2020-07-16 21:41:04 +00:00
"time"
2020-06-24 21:06:56 +00:00
2022-04-20 21:34:28 +00:00
prooftypes "github.com/filecoin-project/go-state-types/proof"
"github.com/google/uuid"
2020-07-16 21:41:04 +00:00
"github.com/ipfs/go-cid"
logging "github.com/ipfs/go-log/v2"
2020-06-24 21:06:56 +00:00
"github.com/stretchr/testify/require"
2020-07-16 21:41:04 +00:00
2020-09-07 03:49:10 +00:00
"github.com/filecoin-project/go-state-types/abi"
2020-07-16 21:41:04 +00:00
"github.com/filecoin-project/lotus/extern/sector-storage/fsutil"
"github.com/filecoin-project/lotus/extern/sector-storage/sealtasks"
"github.com/filecoin-project/lotus/extern/sector-storage/stores"
"github.com/filecoin-project/lotus/extern/sector-storage/storiface"
2020-07-16 21:41:04 +00:00
"github.com/filecoin-project/specs-storage/storage"
2020-06-24 21:06:56 +00:00
)
2020-08-28 14:33:41 +00:00
func init() {
InitWait = 10 * time.Millisecond
}
2020-06-24 21:06:56 +00:00
func TestWithPriority(t *testing.T) {
ctx := context.Background()
require.Equal(t, DefaultSchedPriority, getPriority(ctx))
ctx = WithPriority(ctx, 2222)
require.Equal(t, 2222, getPriority(ctx))
}
2020-07-16 21:41:04 +00:00
var decentWorkerResources = storiface.WorkerResources{
MemPhysical: 128 << 30,
MemSwap: 200 << 30,
MemUsed: 1 << 30,
MemSwapUsed: 1 << 30,
CPUs: 32,
2021-11-29 14:25:01 +00:00
GPUs: []string{},
}
var constrainedWorkerResources = storiface.WorkerResources{
MemPhysical: 1 << 30,
MemUsed: 1 << 30,
MemSwapUsed: 1 << 30,
CPUs: 1,
}
2020-07-16 21:41:04 +00:00
type schedTestWorker struct {
2020-07-16 21:41:15 +00:00
name string
2020-07-16 21:41:04 +00:00
taskTypes map[sealtasks.TaskType]struct{}
2022-01-18 10:57:04 +00:00
paths []storiface.StoragePath
2020-07-16 21:41:04 +00:00
2020-07-16 21:41:15 +00:00
closed bool
session uuid.UUID
resources storiface.WorkerResources
ignoreResources bool
2020-07-16 21:41:04 +00:00
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) SealPreCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, pieces []abi.PieceInfo) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) SealPreCommit2(ctx context.Context, sector storage.SectorRef, pc1o storage.PreCommit1Out) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) SealCommit1(ctx context.Context, sector storage.SectorRef, ticket abi.SealRandomness, seed abi.InteractiveSealRandomness, pieces []abi.PieceInfo, cids storage.SectorCids) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) SealCommit2(ctx context.Context, sector storage.SectorRef, c1o storage.Commit1Out) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) FinalizeSector(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) ReleaseUnsealed(ctx context.Context, sector storage.SectorRef, safeToFree []storage.Range) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) Remove(ctx context.Context, sector storage.SectorRef) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) NewSector(ctx context.Context, sector storage.SectorRef) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) AddPiece(ctx context.Context, sector storage.SectorRef, pieceSizes []abi.UnpaddedPieceSize, newPieceSize abi.UnpaddedPieceSize, pieceData storage.Data) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
func (s *schedTestWorker) ReplicaUpdate(ctx context.Context, sector storage.SectorRef, peices []abi.PieceInfo) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ProveReplicaUpdate1(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) ProveReplicaUpdate2(ctx context.Context, sector storage.SectorRef, sectorKey, newSealed, newUnsealed cid.Cid, vanillaProofs storage.ReplicaVanillaProofs) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) GenerateSectorKeyFromData(ctx context.Context, sector storage.SectorRef, commD cid.Cid) (storiface.CallID, error) {
panic("implement me")
}
func (s *schedTestWorker) FinalizeReplicaUpdate(ctx context.Context, sector storage.SectorRef, keepUnsealed []storage.Range) (storiface.CallID, error) {
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) MoveStorage(ctx context.Context, sector storage.SectorRef, types storiface.SectorFileType) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) Fetch(ctx context.Context, id storage.SectorRef, ft storiface.SectorFileType, ptype storiface.PathType, am storiface.AcquireMode) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) UnsealPiece(ctx context.Context, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize, randomness abi.SealRandomness, cid cid.Cid) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2020-11-05 06:34:24 +00:00
func (s *schedTestWorker) ReadPiece(ctx context.Context, writer io.Writer, id storage.SectorRef, index storiface.UnpaddedByteIndex, size abi.UnpaddedPieceSize) (storiface.CallID, error) {
2020-07-16 21:41:04 +00:00
panic("implement me")
}
2022-04-20 21:34:28 +00:00
func (s *schedTestWorker) GenerateWinningPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, randomness abi.PoStRandomness) ([]prooftypes.PoStProof, error) {
panic("implement me")
}
2022-01-14 13:11:04 +00:00
func (s *schedTestWorker) GenerateWindowPoSt(ctx context.Context, ppt abi.RegisteredPoStProof, mid abi.ActorID, sectors []storiface.PostSectorChallenge, partitionIdx int, randomness abi.PoStRandomness) (storiface.WindowPoStResult, error) {
panic("implement me")
}
2020-07-16 21:41:04 +00:00
func (s *schedTestWorker) TaskTypes(ctx context.Context) (map[sealtasks.TaskType]struct{}, error) {
return s.taskTypes, nil
}
2022-01-18 10:57:04 +00:00
func (s *schedTestWorker) Paths(ctx context.Context) ([]storiface.StoragePath, error) {
2020-07-16 21:41:04 +00:00
return s.paths, nil
}
func (s *schedTestWorker) Info(ctx context.Context) (storiface.WorkerInfo, error) {
return storiface.WorkerInfo{
Hostname: s.name,
IgnoreResources: s.ignoreResources,
Resources: s.resources,
2020-07-16 21:41:04 +00:00
}, nil
}
func (s *schedTestWorker) Session(context.Context) (uuid.UUID, error) {
return s.session, nil
2020-07-16 21:41:04 +00:00
}
func (s *schedTestWorker) Close() error {
if !s.closed {
2020-07-17 10:59:12 +00:00
log.Info("close schedTestWorker")
2020-07-16 21:41:04 +00:00
s.closed = true
s.session = uuid.UUID{}
2020-07-16 21:41:04 +00:00
}
return nil
}
var _ Worker = &schedTestWorker{}
func addTestWorker(t *testing.T, sched *scheduler, index *stores.Index, name string, taskTypes map[sealtasks.TaskType]struct{}, resources storiface.WorkerResources, ignoreResources bool) {
2020-07-16 21:41:04 +00:00
w := &schedTestWorker{
2020-07-16 21:41:15 +00:00
name: name,
2020-07-16 21:41:04 +00:00
taskTypes: taskTypes,
2022-01-18 10:57:04 +00:00
paths: []storiface.StoragePath{{ID: "bb-8", Weight: 2, LocalPath: "<octopus>food</octopus>", CanSeal: true, CanStore: true}},
2020-07-16 21:41:04 +00:00
session: uuid.New(),
resources: resources,
ignoreResources: ignoreResources,
2020-07-16 21:41:04 +00:00
}
for _, path := range w.paths {
2022-01-18 10:57:04 +00:00
err := index.StorageAttach(context.TODO(), storiface.StorageInfo{
2020-07-16 21:41:15 +00:00
ID: path.ID,
URLs: nil,
Weight: path.Weight,
CanSeal: path.CanSeal,
CanStore: path.CanStore,
2020-07-16 21:41:04 +00:00
}, fsutil.FsStat{
Capacity: 1 << 40,
Available: 1 << 40,
FSAvailable: 1 << 40,
Reserved: 3,
2020-07-16 21:41:04 +00:00
})
require.NoError(t, err)
}
2022-01-14 13:11:04 +00:00
sessID, err := w.Session(context.TODO())
require.NoError(t, err)
wid := storiface.WorkerID(sessID)
wh, err := newWorkerHandle(context.TODO(), w)
require.NoError(t, err)
require.NoError(t, sched.runWorker(context.TODO(), wid, wh))
2020-07-16 21:41:04 +00:00
}
func TestSchedStartStop(t *testing.T) {
2020-11-05 06:34:24 +00:00
sched := newScheduler()
2020-07-16 21:41:04 +00:00
go sched.runSched()
addTestWorker(t, sched, stores.NewIndex(), "fred", nil, decentWorkerResources, false)
2020-07-16 21:41:04 +00:00
2020-07-17 10:59:12 +00:00
require.NoError(t, sched.Close(context.TODO()))
2020-07-16 21:41:04 +00:00
}
func TestSched(t *testing.T) {
2021-12-15 14:30:42 +00:00
//stm: @WORKER_JOBS_001
2021-11-29 14:25:01 +00:00
storiface.ParallelNum = 1
storiface.ParallelDenom = 1
2020-07-17 10:59:12 +00:00
ctx, done := context.WithTimeout(context.Background(), 30*time.Second)
2020-07-16 23:26:55 +00:00
defer done()
2020-07-16 21:41:04 +00:00
2020-07-16 23:26:55 +00:00
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
2020-07-16 21:41:04 +00:00
type workerSpec struct {
2020-07-16 21:41:15 +00:00
name string
2020-07-16 21:41:04 +00:00
taskTypes map[sealtasks.TaskType]struct{}
resources storiface.WorkerResources
ignoreResources bool
2020-07-16 21:41:04 +00:00
}
2020-07-16 23:32:49 +00:00
noopAction := func(ctx context.Context, w Worker) error {
2020-07-16 21:41:04 +00:00
return nil
}
type runMeta struct {
done map[string]chan struct{}
wg sync.WaitGroup
}
type task func(*testing.T, *scheduler, *stores.Index, *runMeta)
2020-07-16 23:26:55 +00:00
sched := func(taskName, expectWorker string, sid abi.SectorNumber, taskType sealtasks.TaskType) task {
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
2020-07-16 21:41:04 +00:00
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
done := make(chan struct{})
rm.done[taskName] = done
2020-09-06 16:54:00 +00:00
sel := newAllocSelector(index, storiface.FTCache, storiface.PathSealing)
2020-07-16 21:41:04 +00:00
rm.wg.Add(1)
go func() {
defer rm.wg.Done()
2020-11-05 06:34:24 +00:00
sectorRef := storage.SectorRef{
ID: abi.SectorID{
Miner: 8,
Number: sid,
},
ProofType: spt,
2020-07-16 23:26:55 +00:00
}
2020-11-05 06:34:24 +00:00
err := sched.Schedule(ctx, sectorRef, taskType, sel, func(ctx context.Context, w Worker) error {
2020-07-16 21:41:04 +00:00
wi, err := w.Info(ctx)
require.NoError(t, err)
require.Equal(t, expectWorker, wi.Hostname)
log.Info("IN ", taskName)
for {
_, ok := <-done
if !ok {
break
}
}
log.Info("OUT ", taskName)
return nil
2020-07-16 23:32:49 +00:00
}, noopAction)
2021-11-29 14:25:01 +00:00
if err != context.Canceled {
require.NoError(t, err, fmt.Sprint(l, l2))
}
2020-07-16 21:41:04 +00:00
}()
2020-07-16 23:26:55 +00:00
<-sched.testSync
2020-07-16 21:41:04 +00:00
}
}
taskStarted := func(name string) task {
2020-07-16 23:26:55 +00:00
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
2020-07-16 21:41:04 +00:00
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
2020-07-16 23:26:55 +00:00
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
2020-07-16 21:41:04 +00:00
}
}
taskDone := func(name string) task {
2020-07-16 23:26:55 +00:00
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
2020-07-16 21:41:04 +00:00
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
2020-07-16 23:26:55 +00:00
select {
case rm.done[name] <- struct{}{}:
case <-ctx.Done():
t.Fatal("ctx error", ctx.Err(), l, l2)
}
2020-07-16 21:41:04 +00:00
close(rm.done[name])
}
}
taskNotScheduled := func(name string) task {
2020-07-16 23:26:55 +00:00
_, _, l, _ := runtime.Caller(1)
_, _, l2, _ := runtime.Caller(2)
2020-07-16 21:41:04 +00:00
return func(t *testing.T, sched *scheduler, index *stores.Index, rm *runMeta) {
select {
case rm.done[name] <- struct{}{}:
2020-07-16 23:26:55 +00:00
t.Fatal("not expected", l, l2)
2020-07-16 21:41:04 +00:00
case <-time.After(10 * time.Millisecond): // TODO: better synchronization thingy
}
}
}
testFunc := func(workers []workerSpec, tasks []task) func(t *testing.T) {
return func(t *testing.T) {
index := stores.NewIndex()
2020-11-05 06:34:24 +00:00
sched := newScheduler()
2020-07-16 23:26:55 +00:00
sched.testSync = make(chan struct{})
2020-07-16 21:41:04 +00:00
go sched.runSched()
for _, worker := range workers {
addTestWorker(t, sched, index, worker.name, worker.taskTypes, worker.resources, worker.ignoreResources)
2020-07-16 21:41:04 +00:00
}
rm := runMeta{
done: map[string]chan struct{}{},
}
for i, task := range tasks {
log.Info("TASK", i)
2020-07-16 21:41:04 +00:00
task(t, sched, index, &rm)
}
log.Info("wait for async stuff")
rm.wg.Wait()
2020-07-17 10:59:12 +00:00
require.NoError(t, sched.Close(context.TODO()))
2020-07-16 21:41:04 +00:00
}
}
multTask := func(tasks ...task) task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
for _, tsk := range tasks {
tsk(t, s, index, meta)
}
}
}
2021-06-21 19:49:24 +00:00
// checks behaviour with workers with constrained resources
// the first one is not ignoring resource constraints, so we assign to the second worker, who is
t.Run("constrained-resources", testFunc([]workerSpec{
{name: "fred1", resources: constrainedWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", resources: constrainedWorkerResources, ignoreResources: true, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
}, []task{
2021-06-21 19:49:24 +00:00
sched("pc1-1", "fred2", 8, sealtasks.TTPreCommit1),
taskStarted("pc1-1"),
2021-06-21 19:49:24 +00:00
taskDone("pc1-1"),
}))
2020-07-16 21:41:04 +00:00
t.Run("one-pc1", testFunc([]workerSpec{
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("pc1-1", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskDone("pc1-1"),
}))
t.Run("pc1-2workers-1", testFunc([]workerSpec{
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskDone("pc1-1"),
}))
t.Run("pc1-2workers-2", testFunc([]workerSpec{
{name: "fred1", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
{name: "fred2", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit2: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("pc1-1", "fred1", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskDone("pc1-1"),
}))
t.Run("pc1-block-pc2", testFunc([]workerSpec{
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskStarted("pc1"),
2020-07-16 23:26:55 +00:00
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
2020-07-16 21:41:04 +00:00
taskNotScheduled("pc2"),
taskDone("pc1"),
taskDone("pc2"),
}))
t.Run("pc2-block-pc1", testFunc([]workerSpec{
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("pc2", "fred", 8, sealtasks.TTPreCommit2),
2020-07-16 21:41:04 +00:00
taskStarted("pc2"),
2020-07-16 23:26:55 +00:00
sched("pc1", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskNotScheduled("pc1"),
taskDone("pc2"),
taskDone("pc1"),
}))
t.Run("pc1-batching", testFunc([]workerSpec{
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}}},
2020-07-16 21:41:04 +00:00
}, []task{
2020-07-16 23:26:55 +00:00
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskStarted("t1"),
2020-07-16 23:26:55 +00:00
sched("t2", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskStarted("t2"),
// with worker settings, we can only run 2 parallel PC1s
// start 2 more to fill fetch buffer
2020-07-16 23:26:55 +00:00
sched("t3", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskNotScheduled("t3"),
2020-07-16 23:26:55 +00:00
sched("t4", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 21:41:04 +00:00
taskNotScheduled("t4"),
taskDone("t1"),
taskDone("t2"),
taskStarted("t3"),
taskStarted("t4"),
taskDone("t3"),
taskDone("t4"),
}))
2020-07-16 23:26:55 +00:00
twoPC1 := func(prefix string, sid abi.SectorNumber, schedAssert func(name string) task) task {
2020-07-16 21:41:04 +00:00
return multTask(
2020-07-16 23:26:55 +00:00
sched(prefix+"-a", "fred", sid, sealtasks.TTPreCommit1),
2020-07-16 21:41:15 +00:00
schedAssert(prefix+"-a"),
2020-07-16 21:41:04 +00:00
2020-07-16 23:32:49 +00:00
sched(prefix+"-b", "fred", sid+1, sealtasks.TTPreCommit1),
2020-07-16 21:41:15 +00:00
schedAssert(prefix+"-b"),
)
2020-07-16 21:41:04 +00:00
}
2020-07-16 23:26:55 +00:00
twoPC1Act := func(prefix string, schedAssert func(name string) task) task {
2020-07-16 21:41:04 +00:00
return multTask(
2020-07-16 23:26:55 +00:00
schedAssert(prefix+"-a"),
schedAssert(prefix+"-b"),
2020-07-16 21:41:15 +00:00
)
2020-07-16 21:41:04 +00:00
}
diag := func() task {
return func(t *testing.T, s *scheduler, index *stores.Index, meta *runMeta) {
time.Sleep(20 * time.Millisecond)
for _, request := range s.diag().Requests {
log.Infof("!!! sDIAG: sid(%d) task(%s)", request.Sector.Number, request.TaskType)
}
wj := (&Manager{sched: s}).WorkerJobs()
type line struct {
storiface.WorkerJob
wid uuid.UUID
}
lines := make([]line, 0)
for wid, jobs := range wj {
for _, job := range jobs {
lines = append(lines, line{
WorkerJob: job,
wid: wid,
})
}
}
// oldest first
sort.Slice(lines, func(i, j int) bool {
if lines[i].RunWait != lines[j].RunWait {
return lines[i].RunWait < lines[j].RunWait
}
return lines[i].Start.Before(lines[j].Start)
})
for _, l := range lines {
log.Infof("!!! wDIAG: rw(%d) sid(%d) t(%s)", l.RunWait, l.Sector.Number, l.Task)
}
}
}
2020-07-16 23:32:49 +00:00
// run this one a bunch of times, it had a very annoying tendency to fail randomly
for i := 0; i < 40; i++ {
2020-07-16 23:26:55 +00:00
t.Run("pc1-pc2-prio", testFunc([]workerSpec{
{name: "fred", resources: decentWorkerResources, taskTypes: map[sealtasks.TaskType]struct{}{sealtasks.TTPreCommit1: {}, sealtasks.TTPreCommit2: {}}},
2020-07-16 23:26:55 +00:00
}, []task{
2020-07-16 23:32:49 +00:00
// fill queues
2020-07-16 23:26:55 +00:00
twoPC1("w0", 0, taskStarted),
twoPC1("w1", 2, taskNotScheduled),
sched("w2", "fred", 4, sealtasks.TTPreCommit1),
taskNotScheduled("w2"),
2020-07-16 21:41:04 +00:00
2020-07-16 23:26:55 +00:00
// windowed
2020-07-16 21:41:04 +00:00
2020-07-16 23:32:49 +00:00
sched("t1", "fred", 8, sealtasks.TTPreCommit1),
2020-07-16 23:26:55 +00:00
taskNotScheduled("t1"),
2020-07-16 21:41:04 +00:00
2020-07-16 23:32:49 +00:00
sched("t2", "fred", 9, sealtasks.TTPreCommit1),
2020-07-16 23:26:55 +00:00
taskNotScheduled("t2"),
2020-07-16 21:41:04 +00:00
2020-07-16 23:32:49 +00:00
sched("t3", "fred", 10, sealtasks.TTPreCommit2),
2020-07-16 23:26:55 +00:00
taskNotScheduled("t3"),
2020-07-16 21:41:04 +00:00
diag(),
2020-07-16 23:26:55 +00:00
twoPC1Act("w0", taskDone),
twoPC1Act("w1", taskStarted),
taskNotScheduled("w2"),
2020-07-16 21:41:04 +00:00
2020-07-16 23:26:55 +00:00
twoPC1Act("w1", taskDone),
taskStarted("w2"),
taskDone("w2"),
diag(),
2020-07-16 21:41:04 +00:00
2020-07-16 23:26:55 +00:00
taskStarted("t3"),
taskNotScheduled("t1"),
taskNotScheduled("t2"),
taskDone("t3"),
taskStarted("t1"),
taskStarted("t2"),
taskDone("t1"),
taskDone("t2"),
}))
}
2020-07-16 21:41:04 +00:00
}
type slowishSelector bool
func (s slowishSelector) Ok(ctx context.Context, task sealtasks.TaskType, spt abi.RegisteredSealProof, a *workerHandle) (bool, error) {
time.Sleep(200 * time.Microsecond)
return bool(s), nil
}
func (s slowishSelector) Cmp(ctx context.Context, task sealtasks.TaskType, a, b *workerHandle) (bool, error) {
time.Sleep(100 * time.Microsecond)
return true, nil
}
var _ WorkerSelector = slowishSelector(true)
func BenchmarkTrySched(b *testing.B) {
logging.SetAllLoggers(logging.LevelInfo)
defer logging.SetAllLoggers(logging.LevelDebug)
ctx := context.Background()
test := func(windows, queue int) func(b *testing.B) {
return func(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()
2020-11-05 06:34:24 +00:00
sched := newScheduler()
2021-11-29 13:42:20 +00:00
sched.workers[storiface.WorkerID{}] = &workerHandle{
2020-10-28 13:23:38 +00:00
workerRpc: nil,
info: storiface.WorkerInfo{
Hostname: "t",
Resources: decentWorkerResources,
},
preparing: &activeResources{},
active: &activeResources{},
}
for i := 0; i < windows; i++ {
sched.openWindows = append(sched.openWindows, &schedWindowRequest{
2021-11-29 13:42:20 +00:00
worker: storiface.WorkerID{},
done: make(chan *schedWindow, 1000),
})
}
for i := 0; i < queue; i++ {
sched.schedQueue.Push(&workerRequest{
taskType: sealtasks.TTCommit2,
sel: slowishSelector(true),
ctx: ctx,
})
}
b.StartTimer()
sched.trySched()
}
}
}
b.Run("1w-1q", test(1, 1))
b.Run("500w-1q", test(500, 1))
b.Run("1w-500q", test(1, 500))
b.Run("200w-400q", test(200, 400))
}
func TestWindowCompact(t *testing.T) {
2020-11-05 06:34:24 +00:00
sh := scheduler{}
spt := abi.RegisteredSealProof_StackedDrg32GiBV1
test := func(start [][]sealtasks.TaskType, expect [][]sealtasks.TaskType) func(t *testing.T) {
return func(t *testing.T) {
wh := &workerHandle{
info: storiface.WorkerInfo{
Resources: decentWorkerResources,
},
}
for _, windowTasks := range start {
window := &schedWindow{}
for _, task := range windowTasks {
2020-11-05 12:43:05 +00:00
window.todo = append(window.todo, &workerRequest{
taskType: task,
sector: storage.SectorRef{ProofType: spt},
})
2021-11-29 14:14:57 +00:00
window.allocated.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
wh.activeWindows = append(wh.activeWindows, window)
}
2020-10-28 13:34:28 +00:00
sw := schedWorker{
2020-10-28 14:10:43 +00:00
sched: &sh,
worker: wh,
2020-10-28 13:34:28 +00:00
}
sw.workerCompactWindows()
require.Equal(t, len(start)-len(expect), -sw.windowsRequested)
for wi, tasks := range expect {
var expectRes activeResources
for ti, task := range tasks {
require.Equal(t, task, wh.activeWindows[wi].todo[ti].taskType, "%d, %d", wi, ti)
2021-11-29 14:14:57 +00:00
expectRes.add(wh.info.Resources, storiface.ResourceTable[task][spt])
}
require.Equal(t, expectRes.cpuUse, wh.activeWindows[wi].allocated.cpuUse, "%d", wi)
require.Equal(t, expectRes.gpuUsed, wh.activeWindows[wi].allocated.gpuUsed, "%d", wi)
require.Equal(t, expectRes.memUsedMin, wh.activeWindows[wi].allocated.memUsedMin, "%d", wi)
require.Equal(t, expectRes.memUsedMax, wh.activeWindows[wi].allocated.memUsedMax, "%d", wi)
}
}
}
t.Run("2-pc1-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("1-window", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}}),
)
t.Run("2-pc2-windows", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit2}, {sealtasks.TTPreCommit2}}),
)
t.Run("2pc1-pc1ap", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1}}),
)
t.Run("2pc1-pc1appc2", test(
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1}, {sealtasks.TTPreCommit1, sealtasks.TTAddPiece, sealtasks.TTPreCommit2}},
[][]sealtasks.TaskType{{sealtasks.TTPreCommit1, sealtasks.TTPreCommit1, sealtasks.TTAddPiece}, {sealtasks.TTPreCommit1, sealtasks.TTPreCommit2}}),
)
}